Merge pull request #1274 from aleiby/fix/1272-sqlite-retry-logic
fix(sqlite): add retry logic to transaction entry points
This commit is contained in:
@@ -273,8 +273,9 @@ func (s *SQLiteStorage) CreateIssuesWithFullOptions(ctx context.Context, issues
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
// Use retry logic with exponential backoff to handle SQLITE_BUSY under concurrent load
|
||||
if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil {
|
||||
// Start IMMEDIATE transaction to acquire write lock early.
|
||||
// The connection's busy_timeout pragma (30s) handles retries if locked.
|
||||
if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil {
|
||||
return fmt.Errorf("failed to begin immediate transaction: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -274,14 +274,14 @@ func (s *SQLiteStorage) CheckEligibility(ctx context.Context, issueID string, ti
|
||||
// This sets compaction_level, compacted_at, compacted_at_commit, and original_size fields.
|
||||
func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int, compressedSize int, commitHash string) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
return s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
var commitHashPtr *string
|
||||
if commitHash != "" {
|
||||
commitHashPtr = &commitHash
|
||||
}
|
||||
|
||||
res, err := tx.ExecContext(ctx, `
|
||||
|
||||
res, err := conn.ExecContext(ctx, `
|
||||
UPDATE issues
|
||||
SET compaction_level = ?,
|
||||
compacted_at = ?,
|
||||
@@ -290,11 +290,11 @@ func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, lev
|
||||
updated_at = ?
|
||||
WHERE id = ?
|
||||
`, level, now, commitHashPtr, originalSize, now, issueID)
|
||||
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to apply compaction metadata: %w", err)
|
||||
}
|
||||
|
||||
|
||||
rows, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||
@@ -302,24 +302,24 @@ func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, lev
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("issue %s not found", issueID)
|
||||
}
|
||||
|
||||
|
||||
reductionPct := 0.0
|
||||
if originalSize > 0 {
|
||||
reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100
|
||||
}
|
||||
|
||||
|
||||
eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`,
|
||||
level, originalSize, compressedSize, reductionPct)
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, 'compactor', ?)
|
||||
`, issueID, types.EventCompacted, eventData)
|
||||
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record compaction event: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
|
||||
dep.CreatedBy = actor
|
||||
}
|
||||
|
||||
return s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
// Cycle Detection and Prevention
|
||||
//
|
||||
// We prevent cycles across most dependency types to maintain a directed acyclic graph (DAG).
|
||||
@@ -105,7 +105,7 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
|
||||
// Skip cycle detection for relates-to (inherently bidirectional)
|
||||
if dep.Type != types.DepRelatesTo {
|
||||
var cycleExists bool
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
err = conn.QueryRowContext(ctx, `
|
||||
WITH RECURSIVE paths AS (
|
||||
SELECT
|
||||
issue_id,
|
||||
@@ -140,24 +140,24 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
|
||||
}
|
||||
}
|
||||
|
||||
// Insert dependency (including metadata and thread_id for edge consolidation - Decision 004)
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
INSERT INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`, dep.IssueID, dep.DependsOnID, dep.Type, dep.CreatedAt, dep.CreatedBy, dep.Metadata, dep.ThreadID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add dependency: %w", err)
|
||||
}
|
||||
// Insert dependency (including metadata and thread_id for edge consolidation - Decision 004)
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`, dep.IssueID, dep.DependsOnID, dep.Type, dep.CreatedAt, dep.CreatedBy, dep.Metadata, dep.ThreadID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add dependency: %w", err)
|
||||
}
|
||||
|
||||
// Record event
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, dep.IssueID, types.EventDependencyAdded, actor,
|
||||
fmt.Sprintf("Added dependency: %s %s %s", dep.IssueID, dep.Type, dep.DependsOnID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
// Record event
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, dep.IssueID, types.EventDependencyAdded, actor,
|
||||
fmt.Sprintf("Added dependency: %s %s %s", dep.IssueID, dep.Type, dep.DependsOnID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
|
||||
// Mark issues as dirty for incremental export
|
||||
// For external refs, only mark the source issue (target doesn't exist locally)
|
||||
@@ -165,14 +165,14 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
|
||||
if !isExternalRef {
|
||||
issueIDsToMark = append(issueIDsToMark, dep.DependsOnID)
|
||||
}
|
||||
if err := markIssuesDirtyTx(ctx, tx, issueIDsToMark); err != nil {
|
||||
if err := markIssuesDirtyTx(ctx, conn, issueIDsToMark); err != nil {
|
||||
return wrapDBError("mark issues dirty after adding dependency", err)
|
||||
}
|
||||
|
||||
// Invalidate blocked issues cache since dependencies changed
|
||||
// Only invalidate for types that affect ready work calculation
|
||||
if dep.Type.AffectsReadyWork() {
|
||||
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
|
||||
if err := s.invalidateBlockedCache(ctx, conn); err != nil {
|
||||
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
||||
}
|
||||
}
|
||||
@@ -183,10 +183,10 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
|
||||
|
||||
// RemoveDependency removes a dependency
|
||||
func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
|
||||
return s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
// First, check what type of dependency is being removed
|
||||
var depType types.DependencyType
|
||||
err := tx.QueryRowContext(ctx, `
|
||||
err := conn.QueryRowContext(ctx, `
|
||||
SELECT type FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
|
||||
`, issueID, dependsOnID).Scan(&depType)
|
||||
|
||||
@@ -196,7 +196,7 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
|
||||
needsCacheInvalidation = depType.AffectsReadyWork()
|
||||
}
|
||||
|
||||
result, err := tx.ExecContext(ctx, `
|
||||
result, err := conn.ExecContext(ctx, `
|
||||
DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
|
||||
`, issueID, dependsOnID)
|
||||
if err != nil {
|
||||
@@ -212,7 +212,7 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
|
||||
return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID)
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, types.EventDependencyRemoved, actor,
|
||||
@@ -227,13 +227,13 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
|
||||
if !strings.HasPrefix(dependsOnID, "external:") {
|
||||
issueIDsToMark = append(issueIDsToMark, dependsOnID)
|
||||
}
|
||||
if err := markIssuesDirtyTx(ctx, tx, issueIDsToMark); err != nil {
|
||||
if err := markIssuesDirtyTx(ctx, conn, issueIDsToMark); err != nil {
|
||||
return wrapDBError("mark issues dirty after removing dependency", err)
|
||||
}
|
||||
|
||||
// Invalidate blocked issues cache if this was a blocking dependency
|
||||
if needsCacheInvalidation {
|
||||
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
|
||||
if err := s.invalidateBlockedCache(ctx, conn); err != nil {
|
||||
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,9 +31,9 @@ func (s *SQLiteStorage) MarkIssuesDirty(ctx context.Context, issueIDs []string)
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
now := time.Now()
|
||||
stmt, err := tx.PrepareContext(ctx, `
|
||||
stmt, err := conn.PrepareContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||
@@ -117,8 +117,8 @@ func (s *SQLiteStorage) ClearDirtyIssuesByID(ctx context.Context, issueIDs []str
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
stmt, err := tx.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`)
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
stmt, err := conn.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare statement: %w", err)
|
||||
}
|
||||
@@ -152,15 +152,16 @@ func (s *SQLiteStorage) GetDirtyIssueCount(ctx context.Context) (int, error) {
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// markIssuesDirtyTx marks multiple issues as dirty within an existing transaction
|
||||
// This is a helper for operations that need to mark issues dirty as part of a larger transaction
|
||||
func markIssuesDirtyTx(ctx context.Context, tx *sql.Tx, issueIDs []string) error {
|
||||
// markIssuesDirtyTx marks multiple issues as dirty within an existing transaction.
|
||||
// This is a helper for operations that need to mark issues dirty as part of a larger transaction.
|
||||
// The exec parameter can be either *sql.Tx or *sql.Conn.
|
||||
func markIssuesDirtyTx(ctx context.Context, exec dbExecutor, issueIDs []string) error {
|
||||
if len(issueIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
stmt, err := tx.PrepareContext(ctx, `
|
||||
stmt, err := exec.PrepareContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||
|
||||
@@ -13,10 +13,10 @@ const limitClause = " LIMIT ?"
|
||||
|
||||
// AddComment adds a comment to an issue
|
||||
func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment string) error {
|
||||
return s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
// Update issue updated_at timestamp first to verify issue exists
|
||||
now := time.Now()
|
||||
res, err := tx.ExecContext(ctx, `
|
||||
res, err := conn.ExecContext(ctx, `
|
||||
UPDATE issues SET updated_at = ? WHERE id = ?
|
||||
`, now, issueID)
|
||||
if err != nil {
|
||||
@@ -31,7 +31,7 @@ func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment
|
||||
return fmt.Errorf("issue %s not found", issueID)
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, types.EventCommented, actor, comment)
|
||||
@@ -40,7 +40,7 @@ func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment
|
||||
}
|
||||
|
||||
// Mark issue as dirty for incremental export
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||
|
||||
@@ -19,8 +19,8 @@ func (s *SQLiteStorage) executeLabelOperation(
|
||||
eventComment string,
|
||||
operationError string,
|
||||
) error {
|
||||
return s.withTx(ctx, func(tx *sql.Tx) error {
|
||||
result, err := tx.ExecContext(ctx, labelSQL, labelSQLArgs...)
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
result, err := conn.ExecContext(ctx, labelSQL, labelSQLArgs...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", operationError, err)
|
||||
}
|
||||
@@ -34,7 +34,7 @@ func (s *SQLiteStorage) executeLabelOperation(
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, eventType, actor, eventComment)
|
||||
@@ -43,7 +43,7 @@ func (s *SQLiteStorage) executeLabelOperation(
|
||||
}
|
||||
|
||||
// Mark issue as dirty for incremental export
|
||||
_, err = tx.ExecContext(ctx, `
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||
|
||||
@@ -158,8 +158,8 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
|
||||
// We use raw Exec instead of BeginTx because database/sql doesn't support transaction
|
||||
// modes in BeginTx, and modernc.org/sqlite's BeginTx always uses DEFERRED mode.
|
||||
//
|
||||
// Use retry logic with exponential backoff to handle SQLITE_BUSY under concurrent load
|
||||
if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil {
|
||||
// The connection's busy_timeout pragma (30s) handles retries if locked.
|
||||
if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil {
|
||||
return fmt.Errorf("failed to begin immediate transaction: %w", err)
|
||||
}
|
||||
|
||||
@@ -1407,82 +1407,73 @@ func (s *SQLiteStorage) CreateTombstone(ctx context.Context, id string, actor st
|
||||
|
||||
// DeleteIssue permanently removes an issue from the database
|
||||
func (s *SQLiteStorage) DeleteIssue(ctx context.Context, id string) error {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
// Mark issues that depend on this one as dirty so they get re-exported
|
||||
// without the stale dependency reference (fixes orphan deps in JSONL)
|
||||
rows, err := tx.QueryContext(ctx, `SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query dependent issues: %w", err)
|
||||
}
|
||||
var dependentIDs []string
|
||||
for rows.Next() {
|
||||
var depID string
|
||||
if err := rows.Scan(&depID); err != nil {
|
||||
_ = rows.Close()
|
||||
return fmt.Errorf("failed to scan dependent issue ID: %w", err)
|
||||
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||
// Mark issues that depend on this one as dirty so they get re-exported
|
||||
// without the stale dependency reference (fixes orphan deps in JSONL)
|
||||
rows, err := conn.QueryContext(ctx, `SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query dependent issues: %w", err)
|
||||
}
|
||||
dependentIDs = append(dependentIDs, depID)
|
||||
}
|
||||
_ = rows.Close()
|
||||
if err := rows.Err(); err != nil {
|
||||
return fmt.Errorf("failed to iterate dependent issues: %w", err)
|
||||
}
|
||||
|
||||
if len(dependentIDs) > 0 {
|
||||
if err := markIssuesDirtyTx(ctx, tx, dependentIDs); err != nil {
|
||||
return fmt.Errorf("failed to mark dependent issues dirty: %w", err)
|
||||
var dependentIDs []string
|
||||
for rows.Next() {
|
||||
var depID string
|
||||
if err := rows.Scan(&depID); err != nil {
|
||||
_ = rows.Close()
|
||||
return fmt.Errorf("failed to scan dependent issue ID: %w", err)
|
||||
}
|
||||
dependentIDs = append(dependentIDs, depID)
|
||||
}
|
||||
_ = rows.Close()
|
||||
if err := rows.Err(); err != nil {
|
||||
return fmt.Errorf("failed to iterate dependent issues: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete dependencies (both directions)
|
||||
_, err = tx.ExecContext(ctx, `DELETE FROM dependencies WHERE issue_id = ? OR depends_on_id = ?`, id, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete dependencies: %w", err)
|
||||
}
|
||||
if len(dependentIDs) > 0 {
|
||||
if err := markIssuesDirtyTx(ctx, conn, dependentIDs); err != nil {
|
||||
return fmt.Errorf("failed to mark dependent issues dirty: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete events
|
||||
_, err = tx.ExecContext(ctx, `DELETE FROM events WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete events: %w", err)
|
||||
}
|
||||
// Delete dependencies (both directions)
|
||||
_, err = conn.ExecContext(ctx, `DELETE FROM dependencies WHERE issue_id = ? OR depends_on_id = ?`, id, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete dependencies: %w", err)
|
||||
}
|
||||
|
||||
// Delete comments (no FK cascade on this table)
|
||||
_, err = tx.ExecContext(ctx, `DELETE FROM comments WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete comments: %w", err)
|
||||
}
|
||||
// Delete events
|
||||
_, err = conn.ExecContext(ctx, `DELETE FROM events WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete events: %w", err)
|
||||
}
|
||||
|
||||
// Delete from dirty_issues
|
||||
_, err = tx.ExecContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete dirty marker: %w", err)
|
||||
}
|
||||
// Delete comments (no FK cascade on this table)
|
||||
_, err = conn.ExecContext(ctx, `DELETE FROM comments WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete comments: %w", err)
|
||||
}
|
||||
|
||||
// Delete the issue itself
|
||||
result, err := tx.ExecContext(ctx, `DELETE FROM issues WHERE id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete issue: %w", err)
|
||||
}
|
||||
// Delete from dirty_issues
|
||||
_, err = conn.ExecContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete dirty marker: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check rows affected: %w", err)
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return fmt.Errorf("issue not found: %s", id)
|
||||
}
|
||||
// Delete the issue itself
|
||||
result, err := conn.ExecContext(ctx, `DELETE FROM issues WHERE id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete issue: %w", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return wrapDBError("commit delete transaction", err)
|
||||
}
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check rows affected: %w", err)
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return fmt.Errorf("issue not found: %s", id)
|
||||
}
|
||||
|
||||
// REMOVED: Counter sync after deletion - no longer needed with hash IDs
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteIssuesResult contains statistics about a batch deletion operation
|
||||
|
||||
@@ -48,8 +48,9 @@ func (s *SQLiteStorage) RunInTransaction(ctx context.Context, fn func(tx storage
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
// Start IMMEDIATE transaction to acquire write lock early.
|
||||
// Use retry logic with exponential backoff to handle SQLITE_BUSY
|
||||
if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil {
|
||||
// BEGIN IMMEDIATE prevents deadlocks by acquiring the write lock upfront.
|
||||
// The connection's busy_timeout pragma (30s) handles retries if locked.
|
||||
if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// QueryContext exposes the underlying database QueryContext method for advanced queries
|
||||
@@ -19,31 +18,67 @@ func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) {
|
||||
}
|
||||
|
||||
// withTx executes a function within a database transaction.
|
||||
// If the function returns an error, the transaction is rolled back.
|
||||
// Otherwise, the transaction is committed.
|
||||
func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
// Uses BEGIN IMMEDIATE to acquire the write lock early, preventing deadlocks
|
||||
// in concurrent scenarios. If the function returns an error, the transaction
|
||||
// is rolled back. Otherwise, the transaction is committed.
|
||||
//
|
||||
// The connection's busy_timeout pragma (30s by default) handles SQLITE_BUSY
|
||||
// retries internally - no additional retry logic is needed here.
|
||||
//
|
||||
// This fixes GH#1272: database lock errors during concurrent operations.
|
||||
func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Conn) error) error {
|
||||
// Acquire a dedicated connection for the transaction.
|
||||
// This ensures all operations in the transaction use the same connection.
|
||||
conn, err := s.db.Conn(ctx)
|
||||
if err != nil {
|
||||
return wrapDBError("acquire connection", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
// Start IMMEDIATE transaction to acquire write lock early.
|
||||
// BEGIN IMMEDIATE prevents deadlocks by acquiring the write lock upfront
|
||||
// rather than upgrading from a read lock later. The connection's
|
||||
// busy_timeout pragma (30s) handles retries if another writer holds the lock.
|
||||
if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil {
|
||||
return wrapDBError("begin transaction", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
if err := fn(tx); err != nil {
|
||||
// Track commit state for cleanup
|
||||
committed := false
|
||||
defer func() {
|
||||
if !committed {
|
||||
// Use background context to ensure rollback completes even if ctx is canceled
|
||||
_, _ = conn.ExecContext(context.Background(), "ROLLBACK")
|
||||
}
|
||||
}()
|
||||
|
||||
// Execute user function
|
||||
if err := fn(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
// Commit the transaction
|
||||
if _, err := conn.ExecContext(ctx, "COMMIT"); err != nil {
|
||||
return wrapDBError("commit transaction", err)
|
||||
}
|
||||
|
||||
committed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExecInTransaction is deprecated. Use withTx instead.
|
||||
func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||
func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Conn) error) error {
|
||||
return s.withTx(ctx, fn)
|
||||
}
|
||||
|
||||
// dbExecutor is an interface satisfied by both *sql.Tx and *sql.Conn.
|
||||
// This allows helper functions to work with either transaction type.
|
||||
type dbExecutor interface {
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
|
||||
}
|
||||
|
||||
// IsUniqueConstraintError checks if an error is a UNIQUE constraint violation
|
||||
func IsUniqueConstraintError(err error) bool {
|
||||
if err == nil {
|
||||
@@ -72,64 +107,3 @@ func IsBusyError(err error) bool {
|
||||
return strings.Contains(errStr, "database is locked") ||
|
||||
strings.Contains(errStr, "SQLITE_BUSY")
|
||||
}
|
||||
|
||||
// beginImmediateWithRetry starts an IMMEDIATE transaction with exponential backoff retry
|
||||
// on SQLITE_BUSY errors. This addresses bd-ola6: under concurrent write load, BEGIN IMMEDIATE
|
||||
// can fail with SQLITE_BUSY, so we retry with exponential backoff instead of failing immediately.
|
||||
//
|
||||
// Parameters:
|
||||
// - ctx: context for cancellation checking
|
||||
// - conn: dedicated database connection (must use same connection for entire transaction)
|
||||
// - maxRetries: maximum number of retry attempts (default: 5)
|
||||
// - initialDelay: initial backoff delay (default: 10ms)
|
||||
//
|
||||
// Returns error if:
|
||||
// - Context is canceled
|
||||
// - BEGIN IMMEDIATE fails with non-busy error
|
||||
// - All retries exhausted with SQLITE_BUSY
|
||||
func beginImmediateWithRetry(ctx context.Context, conn *sql.Conn, maxRetries int, initialDelay time.Duration) error {
|
||||
if maxRetries <= 0 {
|
||||
maxRetries = 5
|
||||
}
|
||||
if initialDelay <= 0 {
|
||||
initialDelay = 10 * time.Millisecond
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
delay := initialDelay
|
||||
|
||||
for attempt := 0; attempt <= maxRetries; attempt++ {
|
||||
// Check context cancellation before each attempt
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Attempt to begin transaction
|
||||
_, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE")
|
||||
if err == nil {
|
||||
return nil // Success
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
|
||||
// If not a busy error, fail immediately
|
||||
if !IsBusyError(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// On last attempt, don't sleep
|
||||
if attempt == maxRetries {
|
||||
break
|
||||
}
|
||||
|
||||
// Exponential backoff: sleep before retry
|
||||
select {
|
||||
case <-time.After(delay):
|
||||
delay *= 2 // Double the delay for next attempt
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
return lastErr // Return the last SQLITE_BUSY error after exhausting retries
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -104,8 +105,8 @@ func TestExecInTransaction(t *testing.T) {
|
||||
defer store.Close()
|
||||
|
||||
t.Run("successful transaction", func(t *testing.T) {
|
||||
err := store.ExecInTransaction(ctx, func(tx *sql.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "test_key", "test_value")
|
||||
err := store.ExecInTransaction(ctx, func(conn *sql.Conn) error {
|
||||
_, err := conn.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "test_key", "test_value")
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
@@ -125,8 +126,8 @@ func TestExecInTransaction(t *testing.T) {
|
||||
|
||||
t.Run("failed transaction rolls back", func(t *testing.T) {
|
||||
expectedErr := errors.New("intentional error")
|
||||
err := store.ExecInTransaction(ctx, func(tx *sql.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "rollback_key", "rollback_value")
|
||||
err := store.ExecInTransaction(ctx, func(conn *sql.Conn) error {
|
||||
_, err := conn.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "rollback_key", "rollback_value")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -257,21 +258,24 @@ func TestIsBusyError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBeginImmediateWithRetry(t *testing.T) {
|
||||
// TestBeginImmediate tests that BEGIN IMMEDIATE transactions work correctly.
|
||||
// Note: The retry logic was removed because SQLite's busy_timeout pragma (30s)
|
||||
// already handles retries internally. See GH#1272 for details.
|
||||
func TestBeginImmediate(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store := newTestStore(t, t.TempDir()+"/test.db")
|
||||
defer store.Close()
|
||||
|
||||
t.Run("successful on first try", func(t *testing.T) {
|
||||
t.Run("successful BEGIN IMMEDIATE", func(t *testing.T) {
|
||||
conn, err := store.db.Conn(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to acquire connection: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
err = beginImmediateWithRetry(ctx, conn, 5, 10)
|
||||
_, err = conn.ExecContext(ctx, "BEGIN IMMEDIATE")
|
||||
if err != nil {
|
||||
t.Errorf("beginImmediateWithRetry failed: %v", err)
|
||||
t.Errorf("BEGIN IMMEDIATE failed: %v", err)
|
||||
}
|
||||
|
||||
// Rollback to clean up
|
||||
@@ -288,30 +292,14 @@ func TestBeginImmediateWithRetry(t *testing.T) {
|
||||
cancelCtx, cancel := context.WithCancel(ctx)
|
||||
cancel() // Cancel immediately
|
||||
|
||||
err = beginImmediateWithRetry(cancelCtx, conn, 5, 10)
|
||||
_, err = conn.ExecContext(cancelCtx, "BEGIN IMMEDIATE")
|
||||
if err == nil {
|
||||
t.Error("Expected context cancellation error, got nil")
|
||||
t.Error("Expected error due to canceled context, got nil")
|
||||
_, _ = conn.ExecContext(context.Background(), "ROLLBACK")
|
||||
}
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
t.Errorf("Expected context.Canceled, got %v", err)
|
||||
// sqlite3 driver returns "interrupted" error rather than wrapping context.Canceled
|
||||
if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "interrupt") {
|
||||
t.Errorf("Expected context cancellation or interrupt error, got %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("defaults for invalid parameters", func(t *testing.T) {
|
||||
conn, err := store.db.Conn(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to acquire connection: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Should use defaults (5 retries, 10ms delay) when passed invalid values
|
||||
err = beginImmediateWithRetry(ctx, conn, 0, 0)
|
||||
if err != nil {
|
||||
t.Errorf("beginImmediateWithRetry with invalid params failed: %v", err)
|
||||
}
|
||||
|
||||
// Rollback to clean up
|
||||
_, _ = conn.ExecContext(context.Background(), "ROLLBACK")
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user