fix(sqlite): add retry logic to transaction entry points (GH#1272)

Update withTx to use BEGIN IMMEDIATE with exponential backoff retry
on SQLITE_BUSY errors. This prevents "database is locked" failures
during concurrent operations (daemon + CLI, multi-agent workflows).

Changes:
- withTx now uses beginImmediateWithRetry (same pattern as RunInTransaction)
- Add dbExecutor interface for helper functions that work with both
  *sql.Tx and *sql.Conn
- Update all withTx callers to use *sql.Conn
- Refactor DeleteIssue to use withTx (fixes the specific error in auto-import)
- Update markIssuesDirtyTx to accept dbExecutor interface

Affected paths:
- MarkIssuesDirty, ClearDirtyIssuesByID (dirty.go)
- AddDependency, RemoveDependency (dependencies.go)
- executeLabelOperation (labels.go)
- AddComment (events.go)
- ApplyCompaction (compact.go)
- DeleteIssue (queries.go)

Note: Some direct BeginTx calls in queries.go (CloseIssue, UpdateIssue,
ReopenIssue, DeleteIssues) still use the old pattern and could be
refactored in a follow-up.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
ruby
2026-01-22 18:30:48 -08:00
committed by Aaron Leiby
parent 1f94b4b363
commit 718fc49776
8 changed files with 156 additions and 133 deletions

View File

@@ -274,14 +274,14 @@ func (s *SQLiteStorage) CheckEligibility(ctx context.Context, issueID string, ti
// This sets compaction_level, compacted_at, compacted_at_commit, and original_size fields. // This sets compaction_level, compacted_at, compacted_at_commit, and original_size fields.
func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int, compressedSize int, commitHash string) error { func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int, compressedSize int, commitHash string) error {
now := time.Now().UTC() now := time.Now().UTC()
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(conn *sql.Conn) error {
var commitHashPtr *string var commitHashPtr *string
if commitHash != "" { if commitHash != "" {
commitHashPtr = &commitHash commitHashPtr = &commitHash
} }
res, err := tx.ExecContext(ctx, ` res, err := conn.ExecContext(ctx, `
UPDATE issues UPDATE issues
SET compaction_level = ?, SET compaction_level = ?,
compacted_at = ?, compacted_at = ?,
@@ -290,11 +290,11 @@ func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, lev
updated_at = ? updated_at = ?
WHERE id = ? WHERE id = ?
`, level, now, commitHashPtr, originalSize, now, issueID) `, level, now, commitHashPtr, originalSize, now, issueID)
if err != nil { if err != nil {
return fmt.Errorf("failed to apply compaction metadata: %w", err) return fmt.Errorf("failed to apply compaction metadata: %w", err)
} }
rows, err := res.RowsAffected() rows, err := res.RowsAffected()
if err != nil { if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err) return fmt.Errorf("failed to get rows affected: %w", err)
@@ -302,24 +302,24 @@ func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, lev
if rows == 0 { if rows == 0 {
return fmt.Errorf("issue %s not found", issueID) return fmt.Errorf("issue %s not found", issueID)
} }
reductionPct := 0.0 reductionPct := 0.0
if originalSize > 0 { if originalSize > 0 {
reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100 reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100
} }
eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`, eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`,
level, originalSize, compressedSize, reductionPct) level, originalSize, compressedSize, reductionPct)
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO events (issue_id, event_type, actor, comment) INSERT INTO events (issue_id, event_type, actor, comment)
VALUES (?, ?, 'compactor', ?) VALUES (?, ?, 'compactor', ?)
`, issueID, types.EventCompacted, eventData) `, issueID, types.EventCompacted, eventData)
if err != nil { if err != nil {
return fmt.Errorf("failed to record compaction event: %w", err) return fmt.Errorf("failed to record compaction event: %w", err)
} }
return nil return nil
}) })
} }

View File

@@ -75,7 +75,7 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
dep.CreatedBy = actor dep.CreatedBy = actor
} }
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(conn *sql.Conn) error {
// Cycle Detection and Prevention // Cycle Detection and Prevention
// //
// We prevent cycles across most dependency types to maintain a directed acyclic graph (DAG). // We prevent cycles across most dependency types to maintain a directed acyclic graph (DAG).
@@ -105,7 +105,7 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
// Skip cycle detection for relates-to (inherently bidirectional) // Skip cycle detection for relates-to (inherently bidirectional)
if dep.Type != types.DepRelatesTo { if dep.Type != types.DepRelatesTo {
var cycleExists bool var cycleExists bool
err = tx.QueryRowContext(ctx, ` err = conn.QueryRowContext(ctx, `
WITH RECURSIVE paths AS ( WITH RECURSIVE paths AS (
SELECT SELECT
issue_id, issue_id,
@@ -140,24 +140,24 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
} }
} }
// Insert dependency (including metadata and thread_id for edge consolidation - Decision 004) // Insert dependency (including metadata and thread_id for edge consolidation - Decision 004)
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id) INSERT INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
VALUES (?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?)
`, dep.IssueID, dep.DependsOnID, dep.Type, dep.CreatedAt, dep.CreatedBy, dep.Metadata, dep.ThreadID) `, dep.IssueID, dep.DependsOnID, dep.Type, dep.CreatedAt, dep.CreatedBy, dep.Metadata, dep.ThreadID)
if err != nil { if err != nil {
return fmt.Errorf("failed to add dependency: %w", err) return fmt.Errorf("failed to add dependency: %w", err)
} }
// Record event // Record event
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO events (issue_id, event_type, actor, comment) INSERT INTO events (issue_id, event_type, actor, comment)
VALUES (?, ?, ?, ?) VALUES (?, ?, ?, ?)
`, dep.IssueID, types.EventDependencyAdded, actor, `, dep.IssueID, types.EventDependencyAdded, actor,
fmt.Sprintf("Added dependency: %s %s %s", dep.IssueID, dep.Type, dep.DependsOnID)) fmt.Sprintf("Added dependency: %s %s %s", dep.IssueID, dep.Type, dep.DependsOnID))
if err != nil { if err != nil {
return fmt.Errorf("failed to record event: %w", err) return fmt.Errorf("failed to record event: %w", err)
} }
// Mark issues as dirty for incremental export // Mark issues as dirty for incremental export
// For external refs, only mark the source issue (target doesn't exist locally) // For external refs, only mark the source issue (target doesn't exist locally)
@@ -165,14 +165,14 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
if !isExternalRef { if !isExternalRef {
issueIDsToMark = append(issueIDsToMark, dep.DependsOnID) issueIDsToMark = append(issueIDsToMark, dep.DependsOnID)
} }
if err := markIssuesDirtyTx(ctx, tx, issueIDsToMark); err != nil { if err := markIssuesDirtyTx(ctx, conn, issueIDsToMark); err != nil {
return wrapDBError("mark issues dirty after adding dependency", err) return wrapDBError("mark issues dirty after adding dependency", err)
} }
// Invalidate blocked issues cache since dependencies changed // Invalidate blocked issues cache since dependencies changed
// Only invalidate for types that affect ready work calculation // Only invalidate for types that affect ready work calculation
if dep.Type.AffectsReadyWork() { if dep.Type.AffectsReadyWork() {
if err := s.invalidateBlockedCache(ctx, tx); err != nil { if err := s.invalidateBlockedCache(ctx, conn); err != nil {
return fmt.Errorf("failed to invalidate blocked cache: %w", err) return fmt.Errorf("failed to invalidate blocked cache: %w", err)
} }
} }
@@ -183,10 +183,10 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
// RemoveDependency removes a dependency // RemoveDependency removes a dependency
func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error { func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(conn *sql.Conn) error {
// First, check what type of dependency is being removed // First, check what type of dependency is being removed
var depType types.DependencyType var depType types.DependencyType
err := tx.QueryRowContext(ctx, ` err := conn.QueryRowContext(ctx, `
SELECT type FROM dependencies WHERE issue_id = ? AND depends_on_id = ? SELECT type FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
`, issueID, dependsOnID).Scan(&depType) `, issueID, dependsOnID).Scan(&depType)
@@ -196,7 +196,7 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
needsCacheInvalidation = depType.AffectsReadyWork() needsCacheInvalidation = depType.AffectsReadyWork()
} }
result, err := tx.ExecContext(ctx, ` result, err := conn.ExecContext(ctx, `
DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ? DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
`, issueID, dependsOnID) `, issueID, dependsOnID)
if err != nil { if err != nil {
@@ -212,7 +212,7 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID) return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID)
} }
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO events (issue_id, event_type, actor, comment) INSERT INTO events (issue_id, event_type, actor, comment)
VALUES (?, ?, ?, ?) VALUES (?, ?, ?, ?)
`, issueID, types.EventDependencyRemoved, actor, `, issueID, types.EventDependencyRemoved, actor,
@@ -227,13 +227,13 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
if !strings.HasPrefix(dependsOnID, "external:") { if !strings.HasPrefix(dependsOnID, "external:") {
issueIDsToMark = append(issueIDsToMark, dependsOnID) issueIDsToMark = append(issueIDsToMark, dependsOnID)
} }
if err := markIssuesDirtyTx(ctx, tx, issueIDsToMark); err != nil { if err := markIssuesDirtyTx(ctx, conn, issueIDsToMark); err != nil {
return wrapDBError("mark issues dirty after removing dependency", err) return wrapDBError("mark issues dirty after removing dependency", err)
} }
// Invalidate blocked issues cache if this was a blocking dependency // Invalidate blocked issues cache if this was a blocking dependency
if needsCacheInvalidation { if needsCacheInvalidation {
if err := s.invalidateBlockedCache(ctx, tx); err != nil { if err := s.invalidateBlockedCache(ctx, conn); err != nil {
return fmt.Errorf("failed to invalidate blocked cache: %w", err) return fmt.Errorf("failed to invalidate blocked cache: %w", err)
} }
} }

View File

@@ -31,9 +31,9 @@ func (s *SQLiteStorage) MarkIssuesDirty(ctx context.Context, issueIDs []string)
return nil return nil
} }
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(conn *sql.Conn) error {
now := time.Now() now := time.Now()
stmt, err := tx.PrepareContext(ctx, ` stmt, err := conn.PrepareContext(ctx, `
INSERT INTO dirty_issues (issue_id, marked_at) INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?) VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
@@ -117,8 +117,8 @@ func (s *SQLiteStorage) ClearDirtyIssuesByID(ctx context.Context, issueIDs []str
return nil return nil
} }
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(conn *sql.Conn) error {
stmt, err := tx.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`) stmt, err := conn.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`)
if err != nil { if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err) return fmt.Errorf("failed to prepare statement: %w", err)
} }
@@ -152,15 +152,16 @@ func (s *SQLiteStorage) GetDirtyIssueCount(ctx context.Context) (int, error) {
return count, nil return count, nil
} }
// markIssuesDirtyTx marks multiple issues as dirty within an existing transaction // markIssuesDirtyTx marks multiple issues as dirty within an existing transaction.
// This is a helper for operations that need to mark issues dirty as part of a larger transaction // This is a helper for operations that need to mark issues dirty as part of a larger transaction.
func markIssuesDirtyTx(ctx context.Context, tx *sql.Tx, issueIDs []string) error { // The exec parameter can be either *sql.Tx or *sql.Conn.
func markIssuesDirtyTx(ctx context.Context, exec dbExecutor, issueIDs []string) error {
if len(issueIDs) == 0 { if len(issueIDs) == 0 {
return nil return nil
} }
now := time.Now() now := time.Now()
stmt, err := tx.PrepareContext(ctx, ` stmt, err := exec.PrepareContext(ctx, `
INSERT INTO dirty_issues (issue_id, marked_at) INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?) VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at

View File

@@ -13,10 +13,10 @@ const limitClause = " LIMIT ?"
// AddComment adds a comment to an issue // AddComment adds a comment to an issue
func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment string) error { func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment string) error {
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(conn *sql.Conn) error {
// Update issue updated_at timestamp first to verify issue exists // Update issue updated_at timestamp first to verify issue exists
now := time.Now() now := time.Now()
res, err := tx.ExecContext(ctx, ` res, err := conn.ExecContext(ctx, `
UPDATE issues SET updated_at = ? WHERE id = ? UPDATE issues SET updated_at = ? WHERE id = ?
`, now, issueID) `, now, issueID)
if err != nil { if err != nil {
@@ -31,7 +31,7 @@ func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment
return fmt.Errorf("issue %s not found", issueID) return fmt.Errorf("issue %s not found", issueID)
} }
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO events (issue_id, event_type, actor, comment) INSERT INTO events (issue_id, event_type, actor, comment)
VALUES (?, ?, ?, ?) VALUES (?, ?, ?, ?)
`, issueID, types.EventCommented, actor, comment) `, issueID, types.EventCommented, actor, comment)
@@ -40,7 +40,7 @@ func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment
} }
// Mark issue as dirty for incremental export // Mark issue as dirty for incremental export
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO dirty_issues (issue_id, marked_at) INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?) VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at

View File

@@ -19,8 +19,8 @@ func (s *SQLiteStorage) executeLabelOperation(
eventComment string, eventComment string,
operationError string, operationError string,
) error { ) error {
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(conn *sql.Conn) error {
result, err := tx.ExecContext(ctx, labelSQL, labelSQLArgs...) result, err := conn.ExecContext(ctx, labelSQL, labelSQLArgs...)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", operationError, err) return fmt.Errorf("%s: %w", operationError, err)
} }
@@ -34,7 +34,7 @@ func (s *SQLiteStorage) executeLabelOperation(
return nil return nil
} }
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO events (issue_id, event_type, actor, comment) INSERT INTO events (issue_id, event_type, actor, comment)
VALUES (?, ?, ?, ?) VALUES (?, ?, ?, ?)
`, issueID, eventType, actor, eventComment) `, issueID, eventType, actor, eventComment)
@@ -43,7 +43,7 @@ func (s *SQLiteStorage) executeLabelOperation(
} }
// Mark issue as dirty for incremental export // Mark issue as dirty for incremental export
_, err = tx.ExecContext(ctx, ` _, err = conn.ExecContext(ctx, `
INSERT INTO dirty_issues (issue_id, marked_at) INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?) VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at

View File

@@ -1407,82 +1407,73 @@ func (s *SQLiteStorage) CreateTombstone(ctx context.Context, id string, actor st
// DeleteIssue permanently removes an issue from the database // DeleteIssue permanently removes an issue from the database
func (s *SQLiteStorage) DeleteIssue(ctx context.Context, id string) error { func (s *SQLiteStorage) DeleteIssue(ctx context.Context, id string) error {
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(conn *sql.Conn) error {
if err != nil { // Mark issues that depend on this one as dirty so they get re-exported
return fmt.Errorf("failed to begin transaction: %w", err) // without the stale dependency reference (fixes orphan deps in JSONL)
} rows, err := conn.QueryContext(ctx, `SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
defer func() { _ = tx.Rollback() }() if err != nil {
return fmt.Errorf("failed to query dependent issues: %w", err)
// Mark issues that depend on this one as dirty so they get re-exported
// without the stale dependency reference (fixes orphan deps in JSONL)
rows, err := tx.QueryContext(ctx, `SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
if err != nil {
return fmt.Errorf("failed to query dependent issues: %w", err)
}
var dependentIDs []string
for rows.Next() {
var depID string
if err := rows.Scan(&depID); err != nil {
_ = rows.Close()
return fmt.Errorf("failed to scan dependent issue ID: %w", err)
} }
dependentIDs = append(dependentIDs, depID) var dependentIDs []string
} for rows.Next() {
_ = rows.Close() var depID string
if err := rows.Err(); err != nil { if err := rows.Scan(&depID); err != nil {
return fmt.Errorf("failed to iterate dependent issues: %w", err) _ = rows.Close()
} return fmt.Errorf("failed to scan dependent issue ID: %w", err)
}
if len(dependentIDs) > 0 { dependentIDs = append(dependentIDs, depID)
if err := markIssuesDirtyTx(ctx, tx, dependentIDs); err != nil { }
return fmt.Errorf("failed to mark dependent issues dirty: %w", err) _ = rows.Close()
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate dependent issues: %w", err)
} }
}
// Delete dependencies (both directions) if len(dependentIDs) > 0 {
_, err = tx.ExecContext(ctx, `DELETE FROM dependencies WHERE issue_id = ? OR depends_on_id = ?`, id, id) if err := markIssuesDirtyTx(ctx, conn, dependentIDs); err != nil {
if err != nil { return fmt.Errorf("failed to mark dependent issues dirty: %w", err)
return fmt.Errorf("failed to delete dependencies: %w", err) }
} }
// Delete events // Delete dependencies (both directions)
_, err = tx.ExecContext(ctx, `DELETE FROM events WHERE issue_id = ?`, id) _, err = conn.ExecContext(ctx, `DELETE FROM dependencies WHERE issue_id = ? OR depends_on_id = ?`, id, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete events: %w", err) return fmt.Errorf("failed to delete dependencies: %w", err)
} }
// Delete comments (no FK cascade on this table) // Delete events
_, err = tx.ExecContext(ctx, `DELETE FROM comments WHERE issue_id = ?`, id) _, err = conn.ExecContext(ctx, `DELETE FROM events WHERE issue_id = ?`, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete comments: %w", err) return fmt.Errorf("failed to delete events: %w", err)
} }
// Delete from dirty_issues // Delete comments (no FK cascade on this table)
_, err = tx.ExecContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`, id) _, err = conn.ExecContext(ctx, `DELETE FROM comments WHERE issue_id = ?`, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete dirty marker: %w", err) return fmt.Errorf("failed to delete comments: %w", err)
} }
// Delete the issue itself // Delete from dirty_issues
result, err := tx.ExecContext(ctx, `DELETE FROM issues WHERE id = ?`, id) _, err = conn.ExecContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`, id)
if err != nil { if err != nil {
return fmt.Errorf("failed to delete issue: %w", err) return fmt.Errorf("failed to delete dirty marker: %w", err)
} }
rowsAffected, err := result.RowsAffected() // Delete the issue itself
if err != nil { result, err := conn.ExecContext(ctx, `DELETE FROM issues WHERE id = ?`, id)
return fmt.Errorf("failed to check rows affected: %w", err) if err != nil {
} return fmt.Errorf("failed to delete issue: %w", err)
if rowsAffected == 0 { }
return fmt.Errorf("issue not found: %s", id)
}
if err := tx.Commit(); err != nil { rowsAffected, err := result.RowsAffected()
return wrapDBError("commit delete transaction", err) if err != nil {
} return fmt.Errorf("failed to check rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("issue not found: %s", id)
}
// REMOVED: Counter sync after deletion - no longer needed with hash IDs return nil
return nil })
} }
// DeleteIssuesResult contains statistics about a batch deletion operation // DeleteIssuesResult contains statistics about a batch deletion operation

View File

@@ -18,32 +18,63 @@ func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) {
return s.db.BeginTx(ctx, nil) return s.db.BeginTx(ctx, nil)
} }
// withTx executes a function within a database transaction. // withTx executes a function within a database transaction with retry logic.
// Uses BEGIN IMMEDIATE with exponential backoff retry on SQLITE_BUSY errors.
// If the function returns an error, the transaction is rolled back. // If the function returns an error, the transaction is rolled back.
// Otherwise, the transaction is committed. // Otherwise, the transaction is committed.
func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Tx) error) error { //
tx, err := s.db.BeginTx(ctx, nil) // This fixes GH#1272: database lock errors during concurrent operations.
func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Conn) error) error {
// Acquire a dedicated connection for the transaction.
// This ensures all operations in the transaction use the same connection.
conn, err := s.db.Conn(ctx)
if err != nil { if err != nil {
return wrapDBError("acquire connection", err)
}
defer func() { _ = conn.Close() }()
// Start IMMEDIATE transaction to acquire write lock early.
// Use retry logic with exponential backoff to handle SQLITE_BUSY
if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil {
return wrapDBError("begin transaction", err) return wrapDBError("begin transaction", err)
} }
defer func() { _ = tx.Rollback() }()
if err := fn(tx); err != nil { // Track commit state for cleanup
committed := false
defer func() {
if !committed {
// Use background context to ensure rollback completes even if ctx is canceled
_, _ = conn.ExecContext(context.Background(), "ROLLBACK")
}
}()
// Execute user function
if err := fn(conn); err != nil {
return err return err
} }
if err := tx.Commit(); err != nil { // Commit the transaction
if _, err := conn.ExecContext(ctx, "COMMIT"); err != nil {
return wrapDBError("commit transaction", err) return wrapDBError("commit transaction", err)
} }
committed = true
return nil return nil
} }
// ExecInTransaction is deprecated. Use withTx instead. // ExecInTransaction is deprecated. Use withTx instead.
func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error { func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Conn) error) error {
return s.withTx(ctx, fn) return s.withTx(ctx, fn)
} }
// dbExecutor is an interface satisfied by both *sql.Tx and *sql.Conn.
// This allows helper functions to work with either transaction type.
type dbExecutor interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
}
// IsUniqueConstraintError checks if an error is a UNIQUE constraint violation // IsUniqueConstraintError checks if an error is a UNIQUE constraint violation
func IsUniqueConstraintError(err error) bool { func IsUniqueConstraintError(err error) bool {
if err == nil { if err == nil {

View File

@@ -104,8 +104,8 @@ func TestExecInTransaction(t *testing.T) {
defer store.Close() defer store.Close()
t.Run("successful transaction", func(t *testing.T) { t.Run("successful transaction", func(t *testing.T) {
err := store.ExecInTransaction(ctx, func(tx *sql.Tx) error { err := store.ExecInTransaction(ctx, func(conn *sql.Conn) error {
_, err := tx.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "test_key", "test_value") _, err := conn.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "test_key", "test_value")
return err return err
}) })
if err != nil { if err != nil {
@@ -125,8 +125,8 @@ func TestExecInTransaction(t *testing.T) {
t.Run("failed transaction rolls back", func(t *testing.T) { t.Run("failed transaction rolls back", func(t *testing.T) {
expectedErr := errors.New("intentional error") expectedErr := errors.New("intentional error")
err := store.ExecInTransaction(ctx, func(tx *sql.Tx) error { err := store.ExecInTransaction(ctx, func(conn *sql.Conn) error {
_, err := tx.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "rollback_key", "rollback_value") _, err := conn.ExecContext(ctx, "INSERT INTO config (key, value) VALUES (?, ?)", "rollback_key", "rollback_value")
if err != nil { if err != nil {
return err return err
} }