fix: prevent parallel execution migration race conditions (GH#720)

When multiple bd commands are run in parallel, they can race during database
migrations, causing "duplicate column name" errors. This happens because:

1. Process A checks if column exists → false
2. Process B checks if column exists → false
3. Process A adds column → succeeds
4. Process B adds column → FAILS (duplicate column)

Changes:
- Wrap RunMigrations in BEGIN EXCLUSIVE transaction to serialize migrations
- Disable foreign keys BEFORE the transaction (PRAGMA must be called outside tx)
- Convert nested BEGIN/COMMIT in migrations 010, 022, 025 to use SAVEPOINTs
  (SQLite does not support nested transactions)
- Remove redundant PRAGMA foreign_keys calls from individual migrations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-23 00:55:05 -08:00
parent e79558a972
commit d677554ef3
4 changed files with 102 additions and 54 deletions

View File

@@ -101,8 +101,36 @@ func getMigrationDescription(name string) string {
return "Unknown migration"
}
// RunMigrations executes all registered migrations in order with invariant checking
// RunMigrations executes all registered migrations in order with invariant checking.
// Uses EXCLUSIVE transaction to prevent race conditions when multiple processes
// open the database simultaneously (GH#720).
func RunMigrations(db *sql.DB) error {
// Disable foreign keys BEFORE starting the transaction.
// PRAGMA foreign_keys must be called when no transaction is active (SQLite limitation).
// Some migrations (022, 025) drop/recreate tables and need foreign keys off
// to prevent ON DELETE CASCADE from deleting related data.
_, err := db.Exec("PRAGMA foreign_keys = OFF")
if err != nil {
return fmt.Errorf("failed to disable foreign keys for migrations: %w", err)
}
defer func() { _, _ = db.Exec("PRAGMA foreign_keys = ON") }()
// Acquire EXCLUSIVE lock to serialize migrations across processes.
// Without this, parallel processes can race on check-then-modify operations
// (e.g., checking if a column exists then adding it), causing "duplicate column" errors.
_, err = db.Exec("BEGIN EXCLUSIVE")
if err != nil {
return fmt.Errorf("failed to acquire exclusive lock for migrations: %w", err)
}
// Ensure we release the lock on any exit path
committed := false
defer func() {
if !committed {
_, _ = db.Exec("ROLLBACK")
}
}()
snapshot, err := captureSnapshot(db)
if err != nil {
return fmt.Errorf("failed to capture pre-migration snapshot: %w", err)
@@ -118,5 +146,11 @@ func RunMigrations(db *sql.DB) error {
return fmt.Errorf("post-migration validation failed: %w", err)
}
// Commit the transaction
if _, err := db.Exec("COMMIT"); err != nil {
return fmt.Errorf("failed to commit migrations: %w", err)
}
committed = true
return nil
}

View File

@@ -61,13 +61,20 @@ func MigrateContentHashColumn(db *sql.DB) error {
return fmt.Errorf("error iterating issues: %w", err)
}
tx, err := db.Begin()
// Use SAVEPOINT for atomicity (we're already inside an EXCLUSIVE transaction from RunMigrations)
// SQLite doesn't support nested transactions but SAVEPOINTs work inside transactions
_, err = db.Exec(`SAVEPOINT content_hash_migration`)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
return fmt.Errorf("failed to create savepoint: %w", err)
}
defer tx.Rollback()
savepointReleased := false
defer func() {
if !savepointReleased {
_, _ = db.Exec(`ROLLBACK TO SAVEPOINT content_hash_migration`)
}
}()
stmt, err := tx.Prepare(`UPDATE issues SET content_hash = ? WHERE id = ?`)
stmt, err := db.Prepare(`UPDATE issues SET content_hash = ? WHERE id = ?`)
if err != nil {
return fmt.Errorf("failed to prepare update statement: %w", err)
}
@@ -79,9 +86,12 @@ func MigrateContentHashColumn(db *sql.DB) error {
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
// Release savepoint (commits the changes within the outer transaction)
_, err = db.Exec(`RELEASE SAVEPOINT content_hash_migration`)
if err != nil {
return fmt.Errorf("failed to release savepoint: %w", err)
}
savepointReleased = true
return nil
}

View File

@@ -60,19 +60,10 @@ func MigrateDropEdgeColumns(db *sql.DB) error {
// SQLite 3.35.0+ supports DROP COLUMN, but we use table recreation for compatibility
// This is idempotent - we recreate the table without the deprecated columns
// CRITICAL: Disable foreign keys to prevent CASCADE deletes when we drop the issues table
// The dependencies table has FOREIGN KEY (depends_on_id) REFERENCES issues(id) ON DELETE CASCADE
// Without disabling foreign keys, dropping the issues table would delete all dependencies!
_, err = db.Exec(`PRAGMA foreign_keys = OFF`)
if err != nil {
return fmt.Errorf("failed to disable foreign keys: %w", err)
}
// Re-enable foreign keys at the end (deferred to ensure it runs)
defer func() {
_, _ = db.Exec(`PRAGMA foreign_keys = ON`)
}()
// NOTE: Foreign keys are disabled in RunMigrations() before the EXCLUSIVE transaction starts.
// This prevents ON DELETE CASCADE from deleting dependencies when we drop/recreate the issues table.
// Drop views that depend on the issues table BEFORE starting transaction
// Drop views that depend on the issues table BEFORE starting savepoint
// This is necessary because SQLite validates views during table operations
_, err = db.Exec(`DROP VIEW IF EXISTS ready_issues`)
if err != nil {
@@ -83,15 +74,21 @@ func MigrateDropEdgeColumns(db *sql.DB) error {
return fmt.Errorf("failed to drop blocked_issues view: %w", err)
}
// Start a transaction for atomicity
tx, err := db.Begin()
// Use SAVEPOINT for atomicity (we're already inside an EXCLUSIVE transaction from RunMigrations)
// SQLite doesn't support nested transactions but SAVEPOINTs work inside transactions
_, err = db.Exec(`SAVEPOINT drop_edge_columns`)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
return fmt.Errorf("failed to create savepoint: %w", err)
}
defer tx.Rollback()
savepointReleased := false
defer func() {
if !savepointReleased {
_, _ = db.Exec(`ROLLBACK TO SAVEPOINT drop_edge_columns`)
}
}()
// Create new table without the deprecated columns
_, err = tx.Exec(`
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS issues_new (
id TEXT PRIMARY KEY,
content_hash TEXT,
@@ -129,7 +126,7 @@ func MigrateDropEdgeColumns(db *sql.DB) error {
}
// Copy data from old table to new table (excluding deprecated columns)
_, err = tx.Exec(`
_, err = db.Exec(`
INSERT INTO issues_new (
id, content_hash, title, description, design, acceptance_criteria,
notes, status, priority, issue_type, assignee, estimated_minutes,
@@ -151,13 +148,13 @@ func MigrateDropEdgeColumns(db *sql.DB) error {
}
// Drop old table
_, err = tx.Exec(`DROP TABLE issues`)
_, err = db.Exec(`DROP TABLE issues`)
if err != nil {
return fmt.Errorf("failed to drop old issues table: %w", err)
}
// Rename new table to issues
_, err = tx.Exec(`ALTER TABLE issues_new RENAME TO issues`)
_, err = db.Exec(`ALTER TABLE issues_new RENAME TO issues`)
if err != nil {
return fmt.Errorf("failed to rename new issues table: %w", err)
}
@@ -172,16 +169,18 @@ func MigrateDropEdgeColumns(db *sql.DB) error {
}
for _, idx := range indexes {
_, err = tx.Exec(idx)
_, err = db.Exec(idx)
if err != nil {
return fmt.Errorf("failed to create index: %w", err)
}
}
// Commit transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit migration: %w", err)
// Release savepoint (commits the changes within the outer transaction)
_, err = db.Exec(`RELEASE SAVEPOINT drop_edge_columns`)
if err != nil {
return fmt.Errorf("failed to release savepoint: %w", err)
}
savepointReleased = true
// Recreate views that we dropped earlier (after commit, outside transaction)
// ready_issues view

View File

@@ -8,36 +8,35 @@ import (
// to allow external dependencies (external:<project>:<capability>).
// See bd-zmmy for design context.
func MigrateRemoveDependsOnFK(db *sql.DB) error {
// Disable foreign keys for table recreation
if _, err := db.Exec(`PRAGMA foreign_keys = OFF`); err != nil {
return err
}
defer func() { _, _ = db.Exec(`PRAGMA foreign_keys = ON`) }()
// NOTE: Foreign keys are disabled in RunMigrations() before the EXCLUSIVE transaction starts.
// This prevents ON DELETE CASCADE from deleting data when we drop/recreate the dependencies table.
// Begin transaction for atomic table recreation
tx, err := db.Begin()
// Use SAVEPOINT for atomicity (we're already inside an EXCLUSIVE transaction from RunMigrations)
// SQLite doesn't support nested transactions but SAVEPOINTs work inside transactions
_, err := db.Exec(`SAVEPOINT remove_depends_on_fk`)
if err != nil {
return err
}
savepointReleased := false
defer func() {
if err != nil {
_ = tx.Rollback()
if !savepointReleased {
_, _ = db.Exec(`ROLLBACK TO SAVEPOINT remove_depends_on_fk`)
}
}()
// Drop views that depend on the dependencies table
// They will be recreated after the table is rebuilt
if _, err = tx.Exec(`DROP VIEW IF EXISTS ready_issues`); err != nil {
if _, err = db.Exec(`DROP VIEW IF EXISTS ready_issues`); err != nil {
return err
}
if _, err = tx.Exec(`DROP VIEW IF EXISTS blocked_issues`); err != nil {
if _, err = db.Exec(`DROP VIEW IF EXISTS blocked_issues`); err != nil {
return err
}
// Create new table without FK on depends_on_id
// Keep FK on issue_id (source must exist)
// Remove FK on depends_on_id (target can be external ref)
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE TABLE dependencies_new (
issue_id TEXT NOT NULL,
depends_on_id TEXT NOT NULL,
@@ -54,7 +53,7 @@ func MigrateRemoveDependsOnFK(db *sql.DB) error {
}
// Copy data from old table
if _, err = tx.Exec(`
if _, err = db.Exec(`
INSERT INTO dependencies_new
SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
FROM dependencies
@@ -63,48 +62,48 @@ func MigrateRemoveDependsOnFK(db *sql.DB) error {
}
// Drop old table
if _, err = tx.Exec(`DROP TABLE dependencies`); err != nil {
if _, err = db.Exec(`DROP TABLE dependencies`); err != nil {
return err
}
// Rename new table
if _, err = tx.Exec(`ALTER TABLE dependencies_new RENAME TO dependencies`); err != nil {
if _, err = db.Exec(`ALTER TABLE dependencies_new RENAME TO dependencies`); err != nil {
return err
}
// Recreate indexes
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE INDEX idx_dependencies_issue_id ON dependencies(issue_id)
`); err != nil {
return err
}
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE INDEX idx_dependencies_depends_on ON dependencies(depends_on_id)
`); err != nil {
return err
}
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE INDEX idx_dependencies_type ON dependencies(type)
`); err != nil {
return err
}
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE INDEX idx_dependencies_depends_on_type ON dependencies(depends_on_id, type)
`); err != nil {
return err
}
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE INDEX idx_dependencies_depends_on_type_issue ON dependencies(depends_on_id, type, issue_id)
`); err != nil {
return err
}
// Recreate views
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE VIEW IF NOT EXISTS ready_issues AS
WITH RECURSIVE
blocked_directly AS (
@@ -134,7 +133,7 @@ func MigrateRemoveDependsOnFK(db *sql.DB) error {
return err
}
if _, err = tx.Exec(`
if _, err = db.Exec(`
CREATE VIEW IF NOT EXISTS blocked_issues AS
SELECT
i.*,
@@ -150,5 +149,11 @@ func MigrateRemoveDependsOnFK(db *sql.DB) error {
return err
}
return tx.Commit()
// Release savepoint (commits the changes within the outer transaction)
_, err = db.Exec(`RELEASE SAVEPOINT remove_depends_on_fk`)
if err != nil {
return err
}
savepointReleased = true
return nil
}