Refactor: Replace manual transaction handling with withTx() helper

Fixes bd-1b0a

Changes:
- Added withTx() helper in util.go for cleaner transaction handling
- Kept ExecInTransaction() as deprecated wrapper for backward compatibility
- Refactored all manual BEGIN/COMMIT/ROLLBACK blocks to use withTx():
  - events.go: AddComment
  - dirty.go: MarkIssuesDirty, ClearDirtyIssuesByID
  - labels.go: executeLabelOperation
  - dependencies.go: AddDependency, RemoveDependency
  - compact.go: ApplyCompaction

All tests pass.

Amp-Thread-ID: https://ampcode.com/threads/T-dfacc972-f6c8-4bb3-8997-faa079b5d070
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-11-02 12:55:47 -08:00
parent 79fa6d2fec
commit 7d6d64d2c1
6 changed files with 166 additions and 192 deletions

View File

@@ -267,51 +267,43 @@ func (s *SQLiteStorage) CheckEligibility(ctx context.Context, issueID string, ti
func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int, compressedSize int, commitHash string) error { func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int, compressedSize int, commitHash string) error {
now := time.Now().UTC() now := time.Now().UTC()
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(tx *sql.Tx) error {
if err != nil { var commitHashPtr *string
return fmt.Errorf("failed to begin transaction: %w", err) if commitHash != "" {
} commitHashPtr = &commitHash
defer func() { _ = tx.Rollback() }() }
var commitHashPtr *string _, err := tx.ExecContext(ctx, `
if commitHash != "" { UPDATE issues
commitHashPtr = &commitHash SET compaction_level = ?,
} compacted_at = ?,
compacted_at_commit = ?,
_, err = tx.ExecContext(ctx, ` original_size = ?,
UPDATE issues updated_at = ?
SET compaction_level = ?, WHERE id = ?
compacted_at = ?, `, level, now, commitHashPtr, originalSize, now, issueID)
compacted_at_commit = ?,
original_size = ?, if err != nil {
updated_at = ? return fmt.Errorf("failed to apply compaction metadata: %w", err)
WHERE id = ? }
`, level, now, commitHashPtr, originalSize, now, issueID)
reductionPct := 0.0
if err != nil { if originalSize > 0 {
return fmt.Errorf("failed to apply compaction metadata: %w", err) reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100
} }
reductionPct := 0.0 eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`,
if originalSize > 0 { level, originalSize, compressedSize, reductionPct)
reductionPct = (1.0 - float64(compressedSize)/float64(originalSize)) * 100
} _, err = tx.ExecContext(ctx, `
INSERT INTO events (issue_id, event_type, actor, comment)
eventData := fmt.Sprintf(`{"tier":%d,"original_size":%d,"compressed_size":%d,"reduction_pct":%.1f}`, VALUES (?, ?, 'compactor', ?)
level, originalSize, compressedSize, reductionPct) `, issueID, types.EventCompacted, eventData)
_, err = tx.ExecContext(ctx, ` if err != nil {
INSERT INTO events (issue_id, event_type, actor, comment) return fmt.Errorf("failed to record compaction event: %w", err)
VALUES (?, ?, 'compactor', ?) }
`, issueID, types.EventCompacted, eventData)
return nil
if err != nil { })
return fmt.Errorf("failed to record compaction event: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
return nil
} }

View File

@@ -68,13 +68,8 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
dep.CreatedBy = actor dep.CreatedBy = actor
} }
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(tx *sql.Tx) error {
if err != nil { // Cycle Detection and Prevention
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
// Cycle Detection and Prevention
// //
// We prevent cycles across ALL dependency types (blocks, related, parent-child, discovered-from) // We prevent cycles across ALL dependency types (blocks, related, parent-child, discovered-from)
// to maintain a directed acyclic graph (DAG). This is critical for: // to maintain a directed acyclic graph (DAG). This is critical for:
@@ -149,54 +144,51 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
return fmt.Errorf("failed to record event: %w", err) return fmt.Errorf("failed to record event: %w", err)
} }
// Mark both issues as dirty for incremental export // Mark both issues as dirty for incremental export
// (dependencies are exported with each issue, so both need updating) // (dependencies are exported with each issue, so both need updating)
if err := markIssuesDirtyTx(ctx, tx, []string{dep.IssueID, dep.DependsOnID}); err != nil { if err := markIssuesDirtyTx(ctx, tx, []string{dep.IssueID, dep.DependsOnID}); err != nil {
return err return err
} }
return tx.Commit() return nil
})
} }
// RemoveDependency removes a dependency // RemoveDependency removes a dependency
func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error { func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(tx *sql.Tx) error {
if err != nil { result, err := tx.ExecContext(ctx, `
return fmt.Errorf("failed to begin transaction: %w", err) DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
} `, issueID, dependsOnID)
defer func() { _ = tx.Rollback() }() if err != nil {
return fmt.Errorf("failed to remove dependency: %w", err)
}
result, err := tx.ExecContext(ctx, ` // Check if dependency existed
DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ? rowsAffected, err := result.RowsAffected()
`, issueID, dependsOnID) if err != nil {
if err != nil { return fmt.Errorf("failed to check rows affected: %w", err)
return fmt.Errorf("failed to remove dependency: %w", err) }
} if rowsAffected == 0 {
return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID)
}
// Check if dependency existed _, err = tx.ExecContext(ctx, `
rowsAffected, err := result.RowsAffected() INSERT INTO events (issue_id, event_type, actor, comment)
if err != nil { VALUES (?, ?, ?, ?)
return fmt.Errorf("failed to check rows affected: %w", err) `, issueID, types.EventDependencyRemoved, actor,
} fmt.Sprintf("Removed dependency on %s", dependsOnID))
if rowsAffected == 0 { if err != nil {
return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID) return fmt.Errorf("failed to record event: %w", err)
} }
_, err = tx.ExecContext(ctx, ` // Mark both issues as dirty for incremental export
INSERT INTO events (issue_id, event_type, actor, comment) if err := markIssuesDirtyTx(ctx, tx, []string{issueID, dependsOnID}); err != nil {
VALUES (?, ?, ?, ?) return err
`, issueID, types.EventDependencyRemoved, actor, }
fmt.Sprintf("Removed dependency on %s", dependsOnID))
if err != nil {
return fmt.Errorf("failed to record event: %w", err)
}
// Mark both issues as dirty for incremental export return nil
if err := markIssuesDirtyTx(ctx, tx, []string{issueID, dependsOnID}); err != nil { })
return err
}
return tx.Commit()
} }
// GetDependenciesWithMetadata returns issues that this issue depends on, including dependency type // GetDependenciesWithMetadata returns issues that this issue depends on, including dependency type

View File

@@ -26,30 +26,26 @@ func (s *SQLiteStorage) MarkIssuesDirty(ctx context.Context, issueIDs []string)
return nil return nil
} }
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(tx *sql.Tx) error {
if err != nil { now := time.Now()
return fmt.Errorf("failed to begin transaction: %w", err) stmt, err := tx.PrepareContext(ctx, `
} INSERT INTO dirty_issues (issue_id, marked_at)
defer func() { _ = tx.Rollback() }() VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
now := time.Now() `)
stmt, err := tx.PrepareContext(ctx, ` if err != nil {
INSERT INTO dirty_issues (issue_id, marked_at) return fmt.Errorf("failed to prepare statement: %w", err)
VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer func() { _ = stmt.Close() }()
for _, issueID := range issueIDs {
if _, err := stmt.ExecContext(ctx, issueID, now); err != nil {
return fmt.Errorf("failed to mark issue %s dirty: %w", issueID, err)
} }
} defer func() { _ = stmt.Close() }()
return tx.Commit() for _, issueID := range issueIDs {
if _, err := stmt.ExecContext(ctx, issueID, now); err != nil {
return fmt.Errorf("failed to mark issue %s dirty: %w", issueID, err)
}
}
return nil
})
} }
// GetDirtyIssues returns the list of issue IDs that need to be exported // GetDirtyIssues returns the list of issue IDs that need to be exported
@@ -116,25 +112,21 @@ func (s *SQLiteStorage) ClearDirtyIssuesByID(ctx context.Context, issueIDs []str
return nil return nil
} }
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(tx *sql.Tx) error {
if err != nil { stmt, err := tx.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`)
return fmt.Errorf("failed to begin transaction: %w", err) if err != nil {
} return fmt.Errorf("failed to prepare statement: %w", err)
defer func() { _ = tx.Rollback() }()
stmt, err := tx.PrepareContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer func() { _ = stmt.Close() }()
for _, issueID := range issueIDs {
if _, err := stmt.ExecContext(ctx, issueID); err != nil {
return fmt.Errorf("failed to clear dirty issue %s: %w", issueID, err)
} }
} defer func() { _ = stmt.Close() }()
return tx.Commit() for _, issueID := range issueIDs {
if _, err := stmt.ExecContext(ctx, issueID); err != nil {
return fmt.Errorf("failed to clear dirty issue %s: %w", issueID, err)
}
}
return nil
})
} }
// GetDirtyIssueCount returns the count of dirty issues (for monitoring/debugging) // GetDirtyIssueCount returns the count of dirty issues (for monitoring/debugging)

View File

@@ -13,40 +13,36 @@ const limitClause = " LIMIT ?"
// AddComment adds a comment to an issue // AddComment adds a comment to an issue
func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment string) error { func (s *SQLiteStorage) AddComment(ctx context.Context, issueID, actor, comment string) error {
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(tx *sql.Tx) error {
if err != nil { _, err := tx.ExecContext(ctx, `
return fmt.Errorf("failed to begin transaction: %w", err) INSERT INTO events (issue_id, event_type, actor, comment)
} VALUES (?, ?, ?, ?)
defer func() { _ = tx.Rollback() }() `, issueID, types.EventCommented, actor, comment)
if err != nil {
return fmt.Errorf("failed to add comment: %w", err)
}
_, err = tx.ExecContext(ctx, ` // Update issue updated_at timestamp
INSERT INTO events (issue_id, event_type, actor, comment) now := time.Now()
VALUES (?, ?, ?, ?) _, err = tx.ExecContext(ctx, `
`, issueID, types.EventCommented, actor, comment) UPDATE issues SET updated_at = ? WHERE id = ?
if err != nil { `, now, issueID)
return fmt.Errorf("failed to add comment: %w", err) if err != nil {
} return fmt.Errorf("failed to update timestamp: %w", err)
}
// Update issue updated_at timestamp // Mark issue as dirty for incremental export
now := time.Now() _, err = tx.ExecContext(ctx, `
_, err = tx.ExecContext(ctx, ` INSERT INTO dirty_issues (issue_id, marked_at)
UPDATE issues SET updated_at = ? WHERE id = ? VALUES (?, ?)
`, now, issueID) ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
if err != nil { `, issueID, now)
return fmt.Errorf("failed to update timestamp: %w", err) if err != nil {
} return fmt.Errorf("failed to mark issue dirty: %w", err)
}
// Mark issue as dirty for incremental export return nil
_, err = tx.ExecContext(ctx, ` })
INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
`, issueID, now)
if err != nil {
return fmt.Errorf("failed to mark issue dirty: %w", err)
}
return tx.Commit()
} }
// GetEvents returns the event history for an issue // GetEvents returns the event history for an issue

View File

@@ -2,6 +2,7 @@ package sqlite
import ( import (
"context" "context"
"database/sql"
"fmt" "fmt"
"time" "time"
@@ -18,36 +19,32 @@ func (s *SQLiteStorage) executeLabelOperation(
eventComment string, eventComment string,
operationError string, operationError string,
) error { ) error {
tx, err := s.db.BeginTx(ctx, nil) return s.withTx(ctx, func(tx *sql.Tx) error {
if err != nil { _, err := tx.ExecContext(ctx, labelSQL, labelSQLArgs...)
return fmt.Errorf("failed to begin transaction: %w", err) if err != nil {
} return fmt.Errorf("%s: %w", operationError, err)
defer func() { _ = tx.Rollback() }() }
_, err = tx.ExecContext(ctx, labelSQL, labelSQLArgs...) _, err = tx.ExecContext(ctx, `
if err != nil { INSERT INTO events (issue_id, event_type, actor, comment)
return fmt.Errorf("%s: %w", operationError, err) VALUES (?, ?, ?, ?)
} `, issueID, eventType, actor, eventComment)
if err != nil {
return fmt.Errorf("failed to record event: %w", err)
}
_, err = tx.ExecContext(ctx, ` // Mark issue as dirty for incremental export
INSERT INTO events (issue_id, event_type, actor, comment) _, err = tx.ExecContext(ctx, `
VALUES (?, ?, ?, ?) INSERT INTO dirty_issues (issue_id, marked_at)
`, issueID, eventType, actor, eventComment) VALUES (?, ?)
if err != nil { ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
return fmt.Errorf("failed to record event: %w", err) `, issueID, time.Now())
} if err != nil {
return fmt.Errorf("failed to mark issue dirty: %w", err)
}
// Mark issue as dirty for incremental export return nil
_, err = tx.ExecContext(ctx, ` })
INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?)
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
`, issueID, time.Now())
if err != nil {
return fmt.Errorf("failed to mark issue dirty: %w", err)
}
return tx.Commit()
} }
// AddLabel adds a label to an issue // AddLabel adds a label to an issue

View File

@@ -18,10 +18,10 @@ func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) {
return s.db.BeginTx(ctx, nil) return s.db.BeginTx(ctx, nil)
} }
// ExecInTransaction executes a function within a database transaction. // withTx executes a function within a database transaction.
// If the function returns an error, the transaction is rolled back. // If the function returns an error, the transaction is rolled back.
// Otherwise, the transaction is committed. // Otherwise, the transaction is committed.
func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error { func (s *SQLiteStorage) withTx(ctx context.Context, fn func(*sql.Tx) error) error {
tx, err := s.db.BeginTx(ctx, nil) tx, err := s.db.BeginTx(ctx, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err) return fmt.Errorf("failed to begin transaction: %w", err)
@@ -39,6 +39,11 @@ func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx)
return nil return nil
} }
// ExecInTransaction is deprecated. Use withTx instead.
func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error {
return s.withTx(ctx, fn)
}
// IsUniqueConstraintError checks if an error is a UNIQUE constraint violation // IsUniqueConstraintError checks if an error is a UNIQUE constraint violation
func IsUniqueConstraintError(err error) bool { func IsUniqueConstraintError(err error) bool {
if err == nil { if err == nil {