From 7d6d64d2c1568100369b5f911e2e01550c0a5dc7 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Sun, 2 Nov 2025 12:55:47 -0800 Subject: [PATCH] Refactor: Replace manual transaction handling with withTx() helper Fixes bd-1b0a Changes: - Added withTx() helper in util.go for cleaner transaction handling - Kept ExecInTransaction() as deprecated wrapper for backward compatibility - Refactored all manual BEGIN/COMMIT/ROLLBACK blocks to use withTx(): - events.go: AddComment - dirty.go: MarkIssuesDirty, ClearDirtyIssuesByID - labels.go: executeLabelOperation - dependencies.go: AddDependency, RemoveDependency - compact.go: ApplyCompaction All tests pass. Amp-Thread-ID: https://ampcode.com/threads/T-dfacc972-f6c8-4bb3-8997-faa079b5d070 Co-authored-by: Amp --- internal/storage/sqlite/compact.go | 86 +++++++++++-------------- internal/storage/sqlite/dependencies.go | 84 +++++++++++------------- internal/storage/sqlite/dirty.go | 70 +++++++++----------- internal/storage/sqlite/events.go | 58 ++++++++--------- internal/storage/sqlite/labels.go | 51 +++++++-------- internal/storage/sqlite/util.go | 9 ++- 6 files changed, 166 insertions(+), 192 deletions(-) diff --git a/internal/storage/sqlite/compact.go b/internal/storage/sqlite/compact.go index e2c80d02..2870ef24 100644 --- a/internal/storage/sqlite/compact.go +++ b/internal/storage/sqlite/compact.go @@ -267,51 +267,43 @@ func (s *SQLiteStorage) CheckEligibility(ctx context.Context, issueID string, ti func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int, compressedSize int, commitHash string) error { now := time.Now().UTC() - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - var commitHashPtr *string - if commitHash != "" { - commitHashPtr = &commitHash - } - - _, err = tx.ExecContext(ctx, ` - UPDATE issues - SET compaction_level = ?, - compacted_at = ?, - compacted_at_commit = ?, - original_size = ?, - updated_at = ? - WHERE id = ? - `, level, now, commitHashPtr, originalSize, now, issueID) - - if err != nil { - return fmt.Errorf("failed to apply compaction metadata: %w", err) - } - - reductionPct := 0.0 - if originalSize > 0 { - reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100 - } - - eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`, - level, originalSize, compressedSize, reductionPct) - - _, err = tx.ExecContext(ctx, ` - INSERT INTO events (issue_id, event_type, actor, comment) - VALUES (?, ?, 'compactor', ?) - `, issueID, types.EventCompacted, eventData) - - if err != nil { - return fmt.Errorf("failed to record compaction event: %w", err) - } - - if err := tx.Commit(); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) - } - - return nil + return s.withTx(ctx, func(tx *sql.Tx) error { + var commitHashPtr *string + if commitHash != "" { + commitHashPtr = &commitHash + } + + _, err := tx.ExecContext(ctx, ` + UPDATE issues + SET compaction_level = ?, + compacted_at = ?, + compacted_at_commit = ?, + original_size = ?, + updated_at = ? + WHERE id = ? + `, level, now, commitHashPtr, originalSize, now, issueID) + + if err != nil { + return fmt.Errorf("failed to apply compaction metadata: %w", err) + } + + reductionPct := 0.0 + if originalSize > 0 { + reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100 + } + + eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`, + level, originalSize, compressedSize, reductionPct) + + _, err = tx.ExecContext(ctx, ` + INSERT INTO events (issue_id, event_type, actor, comment) + VALUES (?, ?, 'compactor', ?) + `, issueID, types.EventCompacted, eventData) + + if err != nil { + return fmt.Errorf("failed to record compaction event: %w", err) + } + + return nil + }) } diff --git a/internal/storage/sqlite/dependencies.go b/internal/storage/sqlite/dependencies.go index 7f4a791e..1e35e961 100644 --- a/internal/storage/sqlite/dependencies.go +++ b/internal/storage/sqlite/dependencies.go @@ -68,13 +68,8 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency dep.CreatedBy = actor } - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - // Cycle Detection and Prevention + return s.withTx(ctx, func(tx *sql.Tx) error { + // Cycle Detection and Prevention // // We prevent cycles across ALL dependency types (blocks, related, parent-child, discovered-from) // to maintain a directed acyclic graph (DAG). This is critical for: @@ -149,54 +144,51 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency return fmt.Errorf("failed to record event: %w", err) } - // Mark both issues as dirty for incremental export - // (dependencies are exported with each issue, so both need updating) - if err := markIssuesDirtyTx(ctx, tx, []string{dep.IssueID, dep.DependsOnID}); err != nil { - return err - } + // Mark both issues as dirty for incremental export + // (dependencies are exported with each issue, so both need updating) + if err := markIssuesDirtyTx(ctx, tx, []string{dep.IssueID, dep.DependsOnID}); err != nil { + return err + } - return tx.Commit() + return nil + }) } // RemoveDependency removes a dependency func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error { - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() + return s.withTx(ctx, func(tx *sql.Tx) error { + result, err := tx.ExecContext(ctx, ` + DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ? + `, issueID, dependsOnID) + if err != nil { + return fmt.Errorf("failed to remove dependency: %w", err) + } - result, err := tx.ExecContext(ctx, ` - DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ? - `, issueID, dependsOnID) - if err != nil { - return fmt.Errorf("failed to remove dependency: %w", err) - } + // Check if dependency existed + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to check rows affected: %w", err) + } + if rowsAffected == 0 { + return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID) + } - // Check if dependency existed - rowsAffected, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("failed to check rows affected: %w", err) - } - if rowsAffected == 0 { - return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID) - } + _, err = tx.ExecContext(ctx, ` + INSERT INTO events (issue_id, event_type, actor, comment) + VALUES (?, ?, ?, ?) + `, issueID, types.EventDependencyRemoved, actor, + fmt.Sprintf("Removed dependency on %s", dependsOnID)) + if err != nil { + return fmt.Errorf("failed to record event: %w", err) + } - _, err = tx.ExecContext(ctx, ` - INSERT INTO events (issue_id, event_type, actor, comment) - VALUES (?, ?, ?, ?) - `, issueID, types.EventDependencyRemoved, actor, - fmt.Sprintf("Removed dependency on %s", dependsOnID)) - if err != nil { - return fmt.Errorf("failed to record event: %w", err) - } + // Mark both issues as dirty for incremental export + if err := markIssuesDirtyTx(ctx, tx, []string{issueID, dependsOnID}); err != nil { + return err + } - // Mark both issues as dirty for incremental export - if err := markIssuesDirtyTx(ctx, tx, []string{issueID, dependsOnID}); err != nil { - return err - } - - return tx.Commit() + return nil + }) } // GetDependenciesWithMetadata returns issues that this issue depends on, including dependency type diff --git a/internal/storage/sqlite/dirty.go b/internal/storage/sqlite/dirty.go index 0e38671d..27d6bd5e 100644 --- a/internal/storage/sqlite/dirty.go +++ b/internal/storage/sqlite/dirty.go @@ -26,30 +26,26 @@ func (s *SQLiteStorage) MarkIssuesDirty(ctx context.Context, issueIDs []string) return nil } - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - now := time.Now() - stmt, err := tx.PrepareContext(ctx, ` - INSERT INTO dirty_issues (issue_id, marked_at) - VALUES (?, ?) - ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at - `) - if err != nil { - return fmt.Errorf("failed to prepare statement: %w", err) - } - defer func() { _ = stmt.Close() }() - - for _, issueID := range issueIDs { - if _, err := stmt.ExecContext(ctx, issueID, now); err != nil { - return fmt.Errorf("failed to mark issue %s dirty: %w", issueID, err) + return s.withTx(ctx, func(tx *sql.Tx) error { + now := time.Now() + stmt, err := tx.PrepareContext(ctx, ` + INSERT INTO dirty_issues (issue_id, marked_at) + VALUES (?, ?) + ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at + `) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) } - } + defer func() { _ = stmt.Close() }() - return tx.Commit() + for _, issueID := range issueIDs { + if _, err := stmt.ExecContext(ctx, issueID, now); err != nil { + return fmt.Errorf("failed to mark issue %s dirty: %w", issueID, err) + } + } + + return nil + }) } // GetDirtyIssues returns the list of issue IDs that need to be exported @@ -116,25 +112,21 @@ func (s *SQLiteStorage) ClearDirtyIssuesByID(ctx context.Context, issueIDs []str return nil } - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - stmt, err := tx.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`) - if err != nil { - return fmt.Errorf("failed to prepare statement: %w", err) - } - defer func() { _ = stmt.Close() }() - - for _, issueID := range issueIDs { - if _, err := stmt.ExecContext(ctx, issueID); err != nil { - return fmt.Errorf("failed to clear dirty issue %s: %w", issueID, err) + return s.withTx(ctx, func(tx *sql.Tx) error { + stmt, err := tx.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) } - } + defer func() { _ = stmt.Close() }() - return tx.Commit() + for _, issueID := range issueIDs { + if _, err := stmt.ExecContext(ctx, issueID); err != nil { + return fmt.Errorf("failed to clear dirty issue %s: %w", issueID, err) + } + } + + return nil + }) } // GetDirtyIssueCount returns the count of dirty issues (for monitoring/debugging) diff --git a/internal/storage/sqlite/events.go b/internal/storage/sqlite/events.go index 650e0acc..256e3d97 100644 --- a/internal/storage/sqlite/events.go +++ b/internal/storage/sqlite/events.go @@ -13,40 +13,36 @@ const limitClause = " LIMIT ?" // AddComment adds a comment to an issue func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment string) error { - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() + return s.withTx(ctx, func(tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, ` + INSERT INTO events (issue_id, event_type, actor, comment) + VALUES (?, ?, ?, ?) + `, issueID, types.EventCommented, actor, comment) + if err != nil { + return fmt.Errorf("failed to add comment: %w", err) + } - _, err = tx.ExecContext(ctx, ` - INSERT INTO events (issue_id, event_type, actor, comment) - VALUES (?, ?, ?, ?) - `, issueID, types.EventCommented, actor, comment) - if err != nil { - return fmt.Errorf("failed to add comment: %w", err) - } + // Update issue updated_at timestamp + now := time.Now() + _, err = tx.ExecContext(ctx, ` + UPDATE issues SET updated_at = ? WHERE id = ? + `, now, issueID) + if err != nil { + return fmt.Errorf("failed to update timestamp: %w", err) + } - // Update issue updated_at timestamp - now := time.Now() - _, err = tx.ExecContext(ctx, ` - UPDATE issues SET updated_at = ? WHERE id = ? - `, now, issueID) - if err != nil { - return fmt.Errorf("failed to update timestamp: %w", err) - } + // Mark issue as dirty for incremental export + _, err = tx.ExecContext(ctx, ` + INSERT INTO dirty_issues (issue_id, marked_at) + VALUES (?, ?) + ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at + `, issueID, now) + if err != nil { + return fmt.Errorf("failed to mark issue dirty: %w", err) + } - // Mark issue as dirty for incremental export - _, err = tx.ExecContext(ctx, ` - INSERT INTO dirty_issues (issue_id, marked_at) - VALUES (?, ?) - ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at - `, issueID, now) - if err != nil { - return fmt.Errorf("failed to mark issue dirty: %w", err) - } - - return tx.Commit() + return nil + }) } // GetEvents returns the event history for an issue diff --git a/internal/storage/sqlite/labels.go b/internal/storage/sqlite/labels.go index 77dbbf1f..dc113fbe 100644 --- a/internal/storage/sqlite/labels.go +++ b/internal/storage/sqlite/labels.go @@ -2,6 +2,7 @@ package sqlite import ( "context" + "database/sql" "fmt" "time" @@ -18,36 +19,32 @@ func (s *SQLiteStorage) executeLabelOperation( eventComment string, operationError string, ) error { - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() + return s.withTx(ctx, func(tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, labelSQL, labelSQLArgs...) + if err != nil { + return fmt.Errorf("%s: %w", operationError, err) + } - _, err = tx.ExecContext(ctx, labelSQL, labelSQLArgs...) - if err != nil { - return fmt.Errorf("%s: %w", operationError, err) - } + _, err = tx.ExecContext(ctx, ` + INSERT INTO events (issue_id, event_type, actor, comment) + VALUES (?, ?, ?, ?) + `, issueID, eventType, actor, eventComment) + if err != nil { + return fmt.Errorf("failed to record event: %w", err) + } - _, err = tx.ExecContext(ctx, ` - INSERT INTO events (issue_id, event_type, actor, comment) - VALUES (?, ?, ?, ?) - `, issueID, eventType, actor, eventComment) - if err != nil { - return fmt.Errorf("failed to record event: %w", err) - } + // Mark issue as dirty for incremental export + _, err = tx.ExecContext(ctx, ` + INSERT INTO dirty_issues (issue_id, marked_at) + VALUES (?, ?) + ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at + `, issueID, time.Now()) + if err != nil { + return fmt.Errorf("failed to mark issue dirty: %w", err) + } - // Mark issue as dirty for incremental export - _, err = tx.ExecContext(ctx, ` - INSERT INTO dirty_issues (issue_id, marked_at) - VALUES (?, ?) - ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at - `, issueID, time.Now()) - if err != nil { - return fmt.Errorf("failed to mark issue dirty: %w", err) - } - - return tx.Commit() + return nil + }) } // AddLabel adds a label to an issue diff --git a/internal/storage/sqlite/util.go b/internal/storage/sqlite/util.go index 606b5a13..f7dcf543 100644 --- a/internal/storage/sqlite/util.go +++ b/internal/storage/sqlite/util.go @@ -18,10 +18,10 @@ func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) { return s.db.BeginTx(ctx, nil) } -// ExecInTransaction executes a function within a database transaction. +// withTx executes a function within a database transaction. // If the function returns an error, the transaction is rolled back. // Otherwise, the transaction is committed. -func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error { +func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Tx) error) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) @@ -39,6 +39,11 @@ func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) return nil } +// ExecInTransaction is deprecated. Use withTx instead. +func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error { + return s.withTx(ctx, fn) +} + // IsUniqueConstraintError checks if an error is a UNIQUE constraint violation func IsUniqueConstraintError(err error) bool { if err == nil {