From 718fc49776e47210f8cd1f039178f310410eee9a Mon Sep 17 00:00:00 2001 From: ruby Date: Thu, 22 Jan 2026 18:30:48 -0800 Subject: [PATCH 1/2] fix(sqlite): add retry logic to transaction entry points (GH#1272) Update withTx to use BEGIN IMMEDIATE with exponential backoff retry on SQLITE_BUSY errors. This prevents "database is locked" failures during concurrent operations (daemon + CLI, multi-agent workflows). Changes: - withTx now uses beginImmediateWithRetry (same pattern as RunInTransaction) - Add dbExecutor interface for helper functions that work with both *sql.Tx and *sql.Conn - Update all withTx callers to use *sql.Conn - Refactor DeleteIssue to use withTx (fixes the specific error in auto-import) - Update markIssuesDirtyTx to accept dbExecutor interface Affected paths: - MarkIssuesDirty, ClearDirtyIssuesByID (dirty.go) - AddDependency, RemoveDependency (dependencies.go) - executeLabelOperation (labels.go) - AddComment (events.go) - ApplyCompaction (compact.go) - DeleteIssue (queries.go) Note: Some direct BeginTx calls in queries.go (CloseIssue, UpdateIssue, ReopenIssue, DeleteIssues) still use the old pattern and could be refactored in a follow-up. Co-Authored-By: Claude Opus 4.5 --- internal/storage/sqlite/compact.go | 24 ++--- internal/storage/sqlite/dependencies.go | 54 +++++------ internal/storage/sqlite/dirty.go | 17 ++-- internal/storage/sqlite/events.go | 8 +- internal/storage/sqlite/labels.go | 8 +- internal/storage/sqlite/queries.go | 123 +++++++++++------------- internal/storage/sqlite/util.go | 47 +++++++-- internal/storage/sqlite/util_test.go | 8 +- 8 files changed, 156 insertions(+), 133 deletions(-) diff --git a/internal/storage/sqlite/compact.go b/internal/storage/sqlite/compact.go index 91292ec8..754fa6b8 100644 --- a/internal/storage/sqlite/compact.go +++ b/internal/storage/sqlite/compact.go @@ -274,14 +274,14 @@ func (s *SQLiteStorage) CheckEligibility(ctx context.Context, issueID string, ti // This sets compaction_level, compacted_at, compacted_at_commit, and original_size fields. func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int, compressedSize int, commitHash string) error { now := time.Now().UTC() - - return s.withTx(ctx, func(tx *sql.Tx) error { + + return s.withTx(ctx, func(conn *sql.Conn) error { var commitHashPtr *string if commitHash != "" { commitHashPtr = &commitHash } - - res, err := tx.ExecContext(ctx, ` + + res, err := conn.ExecContext(ctx, ` UPDATE issues SET compaction_level = ?, compacted_at = ?, @@ -290,11 +290,11 @@ func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, lev updated_at = ? WHERE id = ? `, level, now, commitHashPtr, originalSize, now, issueID) - + if err != nil { return fmt.Errorf("failed to apply compaction metadata: %w", err) } - + rows, err := res.RowsAffected() if err != nil { return fmt.Errorf("failed to get rows affected: %w", err) @@ -302,24 +302,24 @@ func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, lev if rows == 0 { return fmt.Errorf("issue %s not found", issueID) } - + reductionPct := 0.0 if originalSize > 0 { reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100 } - + eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`, level, originalSize, compressedSize, reductionPct) - - _, err = tx.ExecContext(ctx, ` + + _, err = conn.ExecContext(ctx, ` INSERT INTO events (issue_id, event_type, actor, comment) VALUES (?, ?, 'compactor', ?) `, issueID, types.EventCompacted, eventData) - + if err != nil { return fmt.Errorf("failed to record compaction event: %w", err) } - + return nil }) } diff --git a/internal/storage/sqlite/dependencies.go b/internal/storage/sqlite/dependencies.go index 7ec74bd9..bdfce813 100644 --- a/internal/storage/sqlite/dependencies.go +++ b/internal/storage/sqlite/dependencies.go @@ -75,7 +75,7 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency dep.CreatedBy = actor } - return s.withTx(ctx, func(tx *sql.Tx) error { + return s.withTx(ctx, func(conn *sql.Conn) error { // Cycle Detection and Prevention // // We prevent cycles across most dependency types to maintain a directed acyclic graph (DAG). @@ -105,7 +105,7 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency // Skip cycle detection for relates-to (inherently bidirectional) if dep.Type != types.DepRelatesTo { var cycleExists bool - err = tx.QueryRowContext(ctx, ` + err = conn.QueryRowContext(ctx, ` WITH RECURSIVE paths AS ( SELECT issue_id, @@ -140,24 +140,24 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency } } - // Insert dependency (including metadata and thread_id for edge consolidation - Decision 004) - _, err = tx.ExecContext(ctx, ` - INSERT INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id) - VALUES (?, ?, ?, ?, ?, ?, ?) - `, dep.IssueID, dep.DependsOnID, dep.Type, dep.CreatedAt, dep.CreatedBy, dep.Metadata, dep.ThreadID) - if err != nil { - return fmt.Errorf("failed to add dependency: %w", err) - } + // Insert dependency (including metadata and thread_id for edge consolidation - Decision 004) + _, err = conn.ExecContext(ctx, ` + INSERT INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, dep.IssueID, dep.DependsOnID, dep.Type, dep.CreatedAt, dep.CreatedBy, dep.Metadata, dep.ThreadID) + if err != nil { + return fmt.Errorf("failed to add dependency: %w", err) + } - // Record event - _, err = tx.ExecContext(ctx, ` - INSERT INTO events (issue_id, event_type, actor, comment) - VALUES (?, ?, ?, ?) - `, dep.IssueID, types.EventDependencyAdded, actor, - fmt.Sprintf("Added dependency: %s %s %s", dep.IssueID, dep.Type, dep.DependsOnID)) - if err != nil { - return fmt.Errorf("failed to record event: %w", err) - } + // Record event + _, err = conn.ExecContext(ctx, ` + INSERT INTO events (issue_id, event_type, actor, comment) + VALUES (?, ?, ?, ?) + `, dep.IssueID, types.EventDependencyAdded, actor, + fmt.Sprintf("Added dependency: %s %s %s", dep.IssueID, dep.Type, dep.DependsOnID)) + if err != nil { + return fmt.Errorf("failed to record event: %w", err) + } // Mark issues as dirty for incremental export // For external refs, only mark the source issue (target doesn't exist locally) @@ -165,14 +165,14 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency if !isExternalRef { issueIDsToMark = append(issueIDsToMark, dep.DependsOnID) } - if err := markIssuesDirtyTx(ctx, tx, issueIDsToMark); err != nil { + if err := markIssuesDirtyTx(ctx, conn, issueIDsToMark); err != nil { return wrapDBError("mark issues dirty after adding dependency", err) } // Invalidate blocked issues cache since dependencies changed // Only invalidate for types that affect ready work calculation if dep.Type.AffectsReadyWork() { - if err := s.invalidateBlockedCache(ctx, tx); err != nil { + if err := s.invalidateBlockedCache(ctx, conn); err != nil { return fmt.Errorf("failed to invalidate blocked cache: %w", err) } } @@ -183,10 +183,10 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency // RemoveDependency removes a dependency func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error { - return s.withTx(ctx, func(tx *sql.Tx) error { + return s.withTx(ctx, func(conn *sql.Conn) error { // First, check what type of dependency is being removed var depType types.DependencyType - err := tx.QueryRowContext(ctx, ` + err := conn.QueryRowContext(ctx, ` SELECT type FROM dependencies WHERE issue_id = ? AND depends_on_id = ? `, issueID, dependsOnID).Scan(&depType) @@ -196,7 +196,7 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn needsCacheInvalidation = depType.AffectsReadyWork() } - result, err := tx.ExecContext(ctx, ` + result, err := conn.ExecContext(ctx, ` DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ? `, issueID, dependsOnID) if err != nil { @@ -212,7 +212,7 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID) } - _, err = tx.ExecContext(ctx, ` + _, err = conn.ExecContext(ctx, ` INSERT INTO events (issue_id, event_type, actor, comment) VALUES (?, ?, ?, ?) `, issueID, types.EventDependencyRemoved, actor, @@ -227,13 +227,13 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn if !strings.HasPrefix(dependsOnID, "external:") { issueIDsToMark = append(issueIDsToMark, dependsOnID) } - if err := markIssuesDirtyTx(ctx, tx, issueIDsToMark); err != nil { + if err := markIssuesDirtyTx(ctx, conn, issueIDsToMark); err != nil { return wrapDBError("mark issues dirty after removing dependency", err) } // Invalidate blocked issues cache if this was a blocking dependency if needsCacheInvalidation { - if err := s.invalidateBlockedCache(ctx, tx); err != nil { + if err := s.invalidateBlockedCache(ctx, conn); err != nil { return fmt.Errorf("failed to invalidate blocked cache: %w", err) } } diff --git a/internal/storage/sqlite/dirty.go b/internal/storage/sqlite/dirty.go index 1bee474f..9326a323 100644 --- a/internal/storage/sqlite/dirty.go +++ b/internal/storage/sqlite/dirty.go @@ -31,9 +31,9 @@ func (s *SQLiteStorage) MarkIssuesDirty(ctx context.Context, issueIDs []string) return nil } - return s.withTx(ctx, func(tx *sql.Tx) error { + return s.withTx(ctx, func(conn *sql.Conn) error { now := time.Now() - stmt, err := tx.PrepareContext(ctx, ` + stmt, err := conn.PrepareContext(ctx, ` INSERT INTO dirty_issues (issue_id, marked_at) VALUES (?, ?) ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at @@ -117,8 +117,8 @@ func (s *SQLiteStorage) ClearDirtyIssuesByID(ctx context.Context, issueIDs []str return nil } - return s.withTx(ctx, func(tx *sql.Tx) error { - stmt, err := tx.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`) + return s.withTx(ctx, func(conn *sql.Conn) error { + stmt, err := conn.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`) if err != nil { return fmt.Errorf("failed to prepare statement: %w", err) } @@ -152,15 +152,16 @@ func (s *SQLiteStorage) GetDirtyIssueCount(ctx context.Context) (int, error) { return count, nil } -// markIssuesDirtyTx marks multiple issues as dirty within an existing transaction -// This is a helper for operations that need to mark issues dirty as part of a larger transaction -func markIssuesDirtyTx(ctx context.Context, tx *sql.Tx, issueIDs []string) error { +// markIssuesDirtyTx marks multiple issues as dirty within an existing transaction. +// This is a helper for operations that need to mark issues dirty as part of a larger transaction. +// The exec parameter can be either *sql.Tx or *sql.Conn. +func markIssuesDirtyTx(ctx context.Context, exec dbExecutor, issueIDs []string) error { if len(issueIDs) == 0 { return nil } now := time.Now() - stmt, err := tx.PrepareContext(ctx, ` + stmt, err := exec.PrepareContext(ctx, ` INSERT INTO dirty_issues (issue_id, marked_at) VALUES (?, ?) ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at diff --git a/internal/storage/sqlite/events.go b/internal/storage/sqlite/events.go index d464ace9..76b3efe6 100644 --- a/internal/storage/sqlite/events.go +++ b/internal/storage/sqlite/events.go @@ -13,10 +13,10 @@ const limitClause = " LIMIT ?" // AddComment adds a comment to an issue func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment string) error { - return s.withTx(ctx, func(tx *sql.Tx) error { + return s.withTx(ctx, func(conn *sql.Conn) error { // Update issue updated_at timestamp first to verify issue exists now := time.Now() - res, err := tx.ExecContext(ctx, ` + res, err := conn.ExecContext(ctx, ` UPDATE issues SET updated_at = ? WHERE id = ? `, now, issueID) if err != nil { @@ -31,7 +31,7 @@ func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment return fmt.Errorf("issue %s not found", issueID) } - _, err = tx.ExecContext(ctx, ` + _, err = conn.ExecContext(ctx, ` INSERT INTO events (issue_id, event_type, actor, comment) VALUES (?, ?, ?, ?) `, issueID, types.EventCommented, actor, comment) @@ -40,7 +40,7 @@ func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment } // Mark issue as dirty for incremental export - _, err = tx.ExecContext(ctx, ` + _, err = conn.ExecContext(ctx, ` INSERT INTO dirty_issues (issue_id, marked_at) VALUES (?, ?) ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at diff --git a/internal/storage/sqlite/labels.go b/internal/storage/sqlite/labels.go index 11e73a5b..ec97ecd1 100644 --- a/internal/storage/sqlite/labels.go +++ b/internal/storage/sqlite/labels.go @@ -19,8 +19,8 @@ func (s *SQLiteStorage) executeLabelOperation( eventComment string, operationError string, ) error { - return s.withTx(ctx, func(tx *sql.Tx) error { - result, err := tx.ExecContext(ctx, labelSQL, labelSQLArgs...) + return s.withTx(ctx, func(conn *sql.Conn) error { + result, err := conn.ExecContext(ctx, labelSQL, labelSQLArgs...) if err != nil { return fmt.Errorf("%s: %w", operationError, err) } @@ -34,7 +34,7 @@ func (s *SQLiteStorage) executeLabelOperation( return nil } - _, err = tx.ExecContext(ctx, ` + _, err = conn.ExecContext(ctx, ` INSERT INTO events (issue_id, event_type, actor, comment) VALUES (?, ?, ?, ?) `, issueID, eventType, actor, eventComment) @@ -43,7 +43,7 @@ func (s *SQLiteStorage) executeLabelOperation( } // Mark issue as dirty for incremental export - _, err = tx.ExecContext(ctx, ` + _, err = conn.ExecContext(ctx, ` INSERT INTO dirty_issues (issue_id, marked_at) VALUES (?, ?) ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at diff --git a/internal/storage/sqlite/queries.go b/internal/storage/sqlite/queries.go index ee2e21cc..e23dcb46 100644 --- a/internal/storage/sqlite/queries.go +++ b/internal/storage/sqlite/queries.go @@ -1407,82 +1407,73 @@ func (s *SQLiteStorage) CreateTombstone(ctx context.Context, id string, actor st // DeleteIssue permanently removes an issue from the database func (s *SQLiteStorage) DeleteIssue(ctx context.Context, id string) error { - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - // Mark issues that depend on this one as dirty so they get re-exported - // without the stale dependency reference (fixes orphan deps in JSONL) - rows, err := tx.QueryContext(ctx, `SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id) - if err != nil { - return fmt.Errorf("failed to query dependent issues: %w", err) - } - var dependentIDs []string - for rows.Next() { - var depID string - if err := rows.Scan(&depID); err != nil { - _ = rows.Close() - return fmt.Errorf("failed to scan dependent issue ID: %w", err) + return s.withTx(ctx, func(conn *sql.Conn) error { + // Mark issues that depend on this one as dirty so they get re-exported + // without the stale dependency reference (fixes orphan deps in JSONL) + rows, err := conn.QueryContext(ctx, `SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id) + if err != nil { + return fmt.Errorf("failed to query dependent issues: %w", err) } - dependentIDs = append(dependentIDs, depID) - } - _ = rows.Close() - if err := rows.Err(); err != nil { - return fmt.Errorf("failed to iterate dependent issues: %w", err) - } - - if len(dependentIDs) > 0 { - if err := markIssuesDirtyTx(ctx, tx, dependentIDs); err != nil { - return fmt.Errorf("failed to mark dependent issues dirty: %w", err) + var dependentIDs []string + for rows.Next() { + var depID string + if err := rows.Scan(&depID); err != nil { + _ = rows.Close() + return fmt.Errorf("failed to scan dependent issue ID: %w", err) + } + dependentIDs = append(dependentIDs, depID) + } + _ = rows.Close() + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate dependent issues: %w", err) } - } - // Delete dependencies (both directions) - _, err = tx.ExecContext(ctx, `DELETE FROM dependencies WHERE issue_id = ? OR depends_on_id = ?`, id, id) - if err != nil { - return fmt.Errorf("failed to delete dependencies: %w", err) - } + if len(dependentIDs) > 0 { + if err := markIssuesDirtyTx(ctx, conn, dependentIDs); err != nil { + return fmt.Errorf("failed to mark dependent issues dirty: %w", err) + } + } - // Delete events - _, err = tx.ExecContext(ctx, `DELETE FROM events WHERE issue_id = ?`, id) - if err != nil { - return fmt.Errorf("failed to delete events: %w", err) - } + // Delete dependencies (both directions) + _, err = conn.ExecContext(ctx, `DELETE FROM dependencies WHERE issue_id = ? OR depends_on_id = ?`, id, id) + if err != nil { + return fmt.Errorf("failed to delete dependencies: %w", err) + } - // Delete comments (no FK cascade on this table) - _, err = tx.ExecContext(ctx, `DELETE FROM comments WHERE issue_id = ?`, id) - if err != nil { - return fmt.Errorf("failed to delete comments: %w", err) - } + // Delete events + _, err = conn.ExecContext(ctx, `DELETE FROM events WHERE issue_id = ?`, id) + if err != nil { + return fmt.Errorf("failed to delete events: %w", err) + } - // Delete from dirty_issues - _, err = tx.ExecContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`, id) - if err != nil { - return fmt.Errorf("failed to delete dirty marker: %w", err) - } + // Delete comments (no FK cascade on this table) + _, err = conn.ExecContext(ctx, `DELETE FROM comments WHERE issue_id = ?`, id) + if err != nil { + return fmt.Errorf("failed to delete comments: %w", err) + } - // Delete the issue itself - result, err := tx.ExecContext(ctx, `DELETE FROM issues WHERE id = ?`, id) - if err != nil { - return fmt.Errorf("failed to delete issue: %w", err) - } + // Delete from dirty_issues + _, err = conn.ExecContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`, id) + if err != nil { + return fmt.Errorf("failed to delete dirty marker: %w", err) + } - rowsAffected, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("failed to check rows affected: %w", err) - } - if rowsAffected == 0 { - return fmt.Errorf("issue not found: %s", id) - } + // Delete the issue itself + result, err := conn.ExecContext(ctx, `DELETE FROM issues WHERE id = ?`, id) + if err != nil { + return fmt.Errorf("failed to delete issue: %w", err) + } - if err := tx.Commit(); err != nil { - return wrapDBError("commit delete transaction", err) - } + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to check rows affected: %w", err) + } + if rowsAffected == 0 { + return fmt.Errorf("issue not found: %s", id) + } - // REMOVED: Counter sync after deletion - no longer needed with hash IDs - return nil + return nil + }) } // DeleteIssuesResult contains statistics about a batch deletion operation diff --git a/internal/storage/sqlite/util.go b/internal/storage/sqlite/util.go index 9a59c62e..528464ea 100644 --- a/internal/storage/sqlite/util.go +++ b/internal/storage/sqlite/util.go @@ -18,32 +18,63 @@ func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) { return s.db.BeginTx(ctx, nil) } -// withTx executes a function within a database transaction. +// withTx executes a function within a database transaction with retry logic. +// Uses BEGIN IMMEDIATE with exponential backoff retry on SQLITE_BUSY errors. // If the function returns an error, the transaction is rolled back. // Otherwise, the transaction is committed. -func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Tx) error) error { - tx, err := s.db.BeginTx(ctx, nil) +// +// This fixes GH#1272: database lock errors during concurrent operations. +func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Conn) error) error { + // Acquire a dedicated connection for the transaction. + // This ensures all operations in the transaction use the same connection. + conn, err := s.db.Conn(ctx) if err != nil { + return wrapDBError("acquire connection", err) + } + defer func() { _ = conn.Close() }() + + // Start IMMEDIATE transaction to acquire write lock early. + // Use retry logic with exponential backoff to handle SQLITE_BUSY + if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil { return wrapDBError("begin transaction", err) } - defer func() { _ = tx.Rollback() }() - if err := fn(tx); err != nil { + // Track commit state for cleanup + committed := false + defer func() { + if !committed { + // Use background context to ensure rollback completes even if ctx is canceled + _, _ = conn.ExecContext(context.Background(), "ROLLBACK") + } + }() + + // Execute user function + if err := fn(conn); err != nil { return err } - if err := tx.Commit(); err != nil { + // Commit the transaction + if _, err := conn.ExecContext(ctx, "COMMIT"); err != nil { return wrapDBError("commit transaction", err) } - + committed = true return nil } // ExecInTransaction is deprecated. Use withTx instead. -func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error { +func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Conn) error) error { return s.withTx(ctx, fn) } +// dbExecutor is an interface satisfied by both *sql.Tx and *sql.Conn. +// This allows helper functions to work with either transaction type. +type dbExecutor interface { + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row + PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) +} + // IsUniqueConstraintError checks if an error is a UNIQUE constraint violation func IsUniqueConstraintError(err error) bool { if err == nil { diff --git a/internal/storage/sqlite/util_test.go b/internal/storage/sqlite/util_test.go index 3ba7f187..9b9ced5e 100644 --- a/internal/storage/sqlite/util_test.go +++ b/internal/storage/sqlite/util_test.go @@ -104,8 +104,8 @@ func TestExecInTransaction(t *testing.T) { defer store.Close() t.Run("successful transaction", func(t *testing.T) { - err := store.ExecInTransaction(ctx, func(tx *sql.Tx) error { - _, err := tx.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "test_key", "test_value") + err := store.ExecInTransaction(ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "test_key", "test_value") return err }) if err != nil { @@ -125,8 +125,8 @@ func TestExecInTransaction(t *testing.T) { t.Run("failed transaction rolls back", func(t *testing.T) { expectedErr := errors.New("intentional error") - err := store.ExecInTransaction(ctx, func(tx *sql.Tx) error { - _, err := tx.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "rollback_key", "rollback_value") + err := store.ExecInTransaction(ctx, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "rollback_key", "rollback_value") if err != nil { return err } From 35bd93b4434f6e1400a8464fc688d73cd7cfa3c1 Mon Sep 17 00:00:00 2001 From: ruby Date: Thu, 22 Jan 2026 18:55:30 -0800 Subject: [PATCH 2/2] fix(sqlite): use BEGIN IMMEDIATE without retry loop (GH#1272) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The original PR added retry logic on top of BEGIN IMMEDIATE, but this caused multi-minute hangs because: 1. Connection has busy_timeout=30s set via pragma 2. Each BEGIN IMMEDIATE waits up to 30s before returning SQLITE_BUSY 3. With 5 retries, worst case was 5 × 30s = 150+ seconds The fix removes the retry loop since SQLite's busy_timeout already handles retries internally. BEGIN IMMEDIATE still acquires the write lock early, preventing deadlocks - we just let busy_timeout handle contention. Root cause analysis in bd-9ldm. Co-Authored-By: Claude Opus 4.5 --- internal/storage/sqlite/batch_ops.go | 5 +- internal/storage/sqlite/queries.go | 4 +- internal/storage/sqlite/transaction.go | 5 +- internal/storage/sqlite/util.go | 79 ++++---------------------- internal/storage/sqlite/util_test.go | 38 +++++-------- 5 files changed, 32 insertions(+), 99 deletions(-) diff --git a/internal/storage/sqlite/batch_ops.go b/internal/storage/sqlite/batch_ops.go index 2505638c..26e61d25 100644 --- a/internal/storage/sqlite/batch_ops.go +++ b/internal/storage/sqlite/batch_ops.go @@ -273,8 +273,9 @@ func (s *SQLiteStorage) CreateIssuesWithFullOptions(ctx context.Context, issues } defer func() { _ = conn.Close() }() - // Use retry logic with exponential backoff to handle SQLITE_BUSY under concurrent load - if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil { + // Start IMMEDIATE transaction to acquire write lock early. + // The connection's busy_timeout pragma (30s) handles retries if locked. + if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil { return fmt.Errorf("failed to begin immediate transaction: %w", err) } diff --git a/internal/storage/sqlite/queries.go b/internal/storage/sqlite/queries.go index e23dcb46..a2ae1613 100644 --- a/internal/storage/sqlite/queries.go +++ b/internal/storage/sqlite/queries.go @@ -158,8 +158,8 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act // We use raw Exec instead of BeginTx because database/sql doesn't support transaction // modes in BeginTx, and modernc.org/sqlite's BeginTx always uses DEFERRED mode. // - // Use retry logic with exponential backoff to handle SQLITE_BUSY under concurrent load - if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil { + // The connection's busy_timeout pragma (30s) handles retries if locked. + if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil { return fmt.Errorf("failed to begin immediate transaction: %w", err) } diff --git a/internal/storage/sqlite/transaction.go b/internal/storage/sqlite/transaction.go index 45ff6164..0a148021 100644 --- a/internal/storage/sqlite/transaction.go +++ b/internal/storage/sqlite/transaction.go @@ -48,8 +48,9 @@ func (s *SQLiteStorage) RunInTransaction(ctx context.Context, fn func(tx storage defer func() { _ = conn.Close() }() // Start IMMEDIATE transaction to acquire write lock early. - // Use retry logic with exponential backoff to handle SQLITE_BUSY - if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil { + // BEGIN IMMEDIATE prevents deadlocks by acquiring the write lock upfront. + // The connection's busy_timeout pragma (30s) handles retries if locked. + if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } diff --git a/internal/storage/sqlite/util.go b/internal/storage/sqlite/util.go index 528464ea..cb9af87d 100644 --- a/internal/storage/sqlite/util.go +++ b/internal/storage/sqlite/util.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "strings" - "time" ) // QueryContext exposes the underlying database QueryContext method for advanced queries @@ -18,10 +17,13 @@ func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) { return s.db.BeginTx(ctx, nil) } -// withTx executes a function within a database transaction with retry logic. -// Uses BEGIN IMMEDIATE with exponential backoff retry on SQLITE_BUSY errors. -// If the function returns an error, the transaction is rolled back. -// Otherwise, the transaction is committed. +// withTx executes a function within a database transaction. +// Uses BEGIN IMMEDIATE to acquire the write lock early, preventing deadlocks +// in concurrent scenarios. If the function returns an error, the transaction +// is rolled back. Otherwise, the transaction is committed. +// +// The connection's busy_timeout pragma (30s by default) handles SQLITE_BUSY +// retries internally - no additional retry logic is needed here. // // This fixes GH#1272: database lock errors during concurrent operations. func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Conn) error) error { @@ -34,8 +36,10 @@ func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Conn) error) er defer func() { _ = conn.Close() }() // Start IMMEDIATE transaction to acquire write lock early. - // Use retry logic with exponential backoff to handle SQLITE_BUSY - if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil { + // BEGIN IMMEDIATE prevents deadlocks by acquiring the write lock upfront + // rather than upgrading from a read lock later. The connection's + // busy_timeout pragma (30s) handles retries if another writer holds the lock. + if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil { return wrapDBError("begin transaction", err) } @@ -103,64 +107,3 @@ func IsBusyError(err error) bool { return strings.Contains(errStr, "database is locked") || strings.Contains(errStr, "SQLITE_BUSY") } - -// beginImmediateWithRetry starts an IMMEDIATE transaction with exponential backoff retry -// on SQLITE_BUSY errors. This addresses bd-ola6: under concurrent write load, BEGIN IMMEDIATE -// can fail with SQLITE_BUSY, so we retry with exponential backoff instead of failing immediately. -// -// Parameters: -// - ctx: context for cancellation checking -// - conn: dedicated database connection (must use same connection for entire transaction) -// - maxRetries: maximum number of retry attempts (default: 5) -// - initialDelay: initial backoff delay (default: 10ms) -// -// Returns error if: -// - Context is canceled -// - BEGIN IMMEDIATE fails with non-busy error -// - All retries exhausted with SQLITE_BUSY -func beginImmediateWithRetry(ctx context.Context, conn *sql.Conn, maxRetries int, initialDelay time.Duration) error { - if maxRetries <= 0 { - maxRetries = 5 - } - if initialDelay <= 0 { - initialDelay = 10 * time.Millisecond - } - - var lastErr error - delay := initialDelay - - for attempt := 0; attempt <= maxRetries; attempt++ { - // Check context cancellation before each attempt - if err := ctx.Err(); err != nil { - return err - } - - // Attempt to begin transaction - _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE") - if err == nil { - return nil // Success - } - - lastErr = err - - // If not a busy error, fail immediately - if !IsBusyError(err) { - return err - } - - // On last attempt, don't sleep - if attempt == maxRetries { - break - } - - // Exponential backoff: sleep before retry - select { - case <-time.After(delay): - delay *= 2 // Double the delay for next attempt - case <-ctx.Done(): - return ctx.Err() - } - } - - return lastErr // Return the last SQLITE_BUSY error after exhausting retries -} diff --git a/internal/storage/sqlite/util_test.go b/internal/storage/sqlite/util_test.go index 9b9ced5e..5aab303a 100644 --- a/internal/storage/sqlite/util_test.go +++ b/internal/storage/sqlite/util_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "errors" + "strings" "testing" ) @@ -257,21 +258,24 @@ func TestIsBusyError(t *testing.T) { } } -func TestBeginImmediateWithRetry(t *testing.T) { +// TestBeginImmediate tests that BEGIN IMMEDIATE transactions work correctly. +// Note: The retry logic was removed because SQLite's busy_timeout pragma (30s) +// already handles retries internally. See GH#1272 for details. +func TestBeginImmediate(t *testing.T) { ctx := context.Background() store := newTestStore(t, t.TempDir()+"/test.db") defer store.Close() - t.Run("successful on first try", func(t *testing.T) { + t.Run("successful BEGIN IMMEDIATE", func(t *testing.T) { conn, err := store.db.Conn(ctx) if err != nil { t.Fatalf("Failed to acquire connection: %v", err) } defer conn.Close() - err = beginImmediateWithRetry(ctx, conn, 5, 10) + _, err = conn.ExecContext(ctx, "BEGIN IMMEDIATE") if err != nil { - t.Errorf("beginImmediateWithRetry failed: %v", err) + t.Errorf("BEGIN IMMEDIATE failed: %v", err) } // Rollback to clean up @@ -288,30 +292,14 @@ func TestBeginImmediateWithRetry(t *testing.T) { cancelCtx, cancel := context.WithCancel(ctx) cancel() // Cancel immediately - err = beginImmediateWithRetry(cancelCtx, conn, 5, 10) + _, err = conn.ExecContext(cancelCtx, "BEGIN IMMEDIATE") if err == nil { - t.Error("Expected context cancellation error, got nil") + t.Error("Expected error due to canceled context, got nil") _, _ = conn.ExecContext(context.Background(), "ROLLBACK") } - if !errors.Is(err, context.Canceled) { - t.Errorf("Expected context.Canceled, got %v", err) + // sqlite3 driver returns "interrupted" error rather than wrapping context.Canceled + if err != nil && !errors.Is(err, context.Canceled) && !strings.Contains(err.Error(), "interrupt") { + t.Errorf("Expected context cancellation or interrupt error, got %v", err) } }) - - t.Run("defaults for invalid parameters", func(t *testing.T) { - conn, err := store.db.Conn(ctx) - if err != nil { - t.Fatalf("Failed to acquire connection: %v", err) - } - defer conn.Close() - - // Should use defaults (5 retries, 10ms delay) when passed invalid values - err = beginImmediateWithRetry(ctx, conn, 0, 0) - if err != nil { - t.Errorf("beginImmediateWithRetry with invalid params failed: %v", err) - } - - // Rollback to clean up - _, _ = conn.ExecContext(context.Background(), "ROLLBACK") - }) }