fix: Fix race condition in parallel issue creation (bd-89)
Use IMMEDIATE transactions with dedicated connections to fix race condition where multiple processes creating issues concurrently caused "UNIQUE constraint failed: issues.id" errors. Key changes: - Use BEGIN IMMEDIATE to acquire RESERVED lock early - Use dedicated connection (sql.Conn) for transaction to ensure all operations happen on same connection - Increase busy_timeout from 10s to 30s for better parallel write handling - Use context.Background() for ROLLBACK to ensure cleanup even if ctx cancelled Added regression test TestParallelIssueCreation that creates 20 issues in parallel and verifies no ID collisions occur. Fixes #6 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -30,8 +30,9 @@ func New(path string) (*SQLiteStorage, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Open database with WAL mode for better concurrency and busy timeout for parallel writes
|
// Open database with WAL mode for better concurrency and busy timeout for parallel writes
|
||||||
// _pragma=busy_timeout(10000) means wait up to 10 seconds for locks instead of failing immediately
|
// _pragma=busy_timeout(30000) means wait up to 30 seconds for locks instead of failing immediately
|
||||||
db, err := sql.Open("sqlite", path+"?_journal_mode=WAL&_foreign_keys=ON&_pragma=busy_timeout(10000)")
|
// Higher timeout helps with parallel issue creation from multiple processes
|
||||||
|
db, err := sql.Open("sqlite", path+"?_journal_mode=WAL&_foreign_keys=ON&_pragma=busy_timeout(30000)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
||||||
}
|
}
|
||||||
@@ -323,19 +324,40 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
|
|||||||
issue.CreatedAt = now
|
issue.CreatedAt = now
|
||||||
issue.UpdatedAt = now
|
issue.UpdatedAt = now
|
||||||
|
|
||||||
// Start transaction BEFORE ID generation to prevent race conditions
|
// Acquire a dedicated connection for the transaction.
|
||||||
// This ensures ID generation and issue insertion are atomic
|
// This is necessary because we need to execute raw SQL ("BEGIN IMMEDIATE", "COMMIT")
|
||||||
tx, err := s.db.BeginTx(ctx, nil)
|
// on the same connection, and database/sql's connection pool would otherwise
|
||||||
|
// use different connections for different queries.
|
||||||
|
conn, err := s.db.Conn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
return fmt.Errorf("failed to acquire connection: %w", err)
|
||||||
}
|
}
|
||||||
defer tx.Rollback()
|
defer conn.Close()
|
||||||
|
|
||||||
|
// Start IMMEDIATE transaction to acquire write lock early and prevent race conditions.
|
||||||
|
// IMMEDIATE acquires a RESERVED lock immediately, preventing other IMMEDIATE or EXCLUSIVE
|
||||||
|
// transactions from starting. This serializes ID generation across concurrent writers.
|
||||||
|
//
|
||||||
|
// We use raw Exec instead of BeginTx because database/sql doesn't support transaction
|
||||||
|
// modes in BeginTx, and modernc.org/sqlite's BeginTx always uses DEFERRED mode.
|
||||||
|
if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil {
|
||||||
|
return fmt.Errorf("failed to begin immediate transaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track commit state for defer cleanup
|
||||||
|
// Use context.Background() for ROLLBACK to ensure cleanup happens even if ctx is cancelled
|
||||||
|
committed := false
|
||||||
|
defer func() {
|
||||||
|
if !committed {
|
||||||
|
conn.ExecContext(context.Background(), "ROLLBACK")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Generate ID if not set (inside transaction to prevent race conditions)
|
// Generate ID if not set (inside transaction to prevent race conditions)
|
||||||
if issue.ID == "" {
|
if issue.ID == "" {
|
||||||
// Get prefix from config, default to "bd"
|
// Get prefix from config, default to "bd"
|
||||||
var prefix string
|
var prefix string
|
||||||
err := tx.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix)
|
err := conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix)
|
||||||
if err == sql.ErrNoRows || prefix == "" {
|
if err == sql.ErrNoRows || prefix == "" {
|
||||||
prefix = "bd"
|
prefix = "bd"
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@@ -343,29 +365,22 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Ensure counter is initialized for this prefix (lazy initialization within transaction)
|
// Ensure counter is initialized for this prefix (lazy initialization within transaction)
|
||||||
var exists int
|
// Use INSERT OR IGNORE to make this idempotent and avoid race conditions
|
||||||
err = tx.QueryRowContext(ctx, `SELECT 1 FROM issue_counters WHERE prefix = ?`, prefix).Scan(&exists)
|
// This will safely initialize the counter if it doesn't exist, or do nothing if it does
|
||||||
if err == sql.ErrNoRows {
|
_, err = conn.ExecContext(ctx, `
|
||||||
// Counter doesn't exist, initialize it from existing issues
|
INSERT OR IGNORE INTO issue_counters (prefix, last_id)
|
||||||
_, err = tx.ExecContext(ctx, `
|
SELECT ?, COALESCE(MAX(CAST(substr(id, LENGTH(?) + 2) AS INTEGER)), 0)
|
||||||
INSERT INTO issue_counters (prefix, last_id)
|
FROM issues
|
||||||
SELECT ?, COALESCE(MAX(CAST(substr(id, LENGTH(?) + 2) AS INTEGER)), 0)
|
WHERE id LIKE ? || '-%'
|
||||||
FROM issues
|
AND substr(id, LENGTH(?) + 2) GLOB '[0-9]*'
|
||||||
WHERE id LIKE ? || '-%'
|
`, prefix, prefix, prefix, prefix)
|
||||||
AND substr(id, LENGTH(?) + 2) GLOB '[0-9]*'
|
if err != nil {
|
||||||
ON CONFLICT(prefix) DO UPDATE SET
|
return fmt.Errorf("failed to initialize counter for prefix %s: %w", prefix, err)
|
||||||
last_id = MAX(last_id, excluded.last_id)
|
|
||||||
`, prefix, prefix, prefix, prefix)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to initialize counter for prefix %s: %w", prefix, err)
|
|
||||||
}
|
|
||||||
} else if err != nil {
|
|
||||||
return fmt.Errorf("failed to check counter existence: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Atomically get next ID from counter table (within transaction)
|
// Atomically get next ID from counter table (within transaction)
|
||||||
var nextID int
|
var nextID int
|
||||||
err = tx.QueryRowContext(ctx, `
|
err = conn.QueryRowContext(ctx, `
|
||||||
INSERT INTO issue_counters (prefix, last_id)
|
INSERT INTO issue_counters (prefix, last_id)
|
||||||
VALUES (?, 1)
|
VALUES (?, 1)
|
||||||
ON CONFLICT(prefix) DO UPDATE SET
|
ON CONFLICT(prefix) DO UPDATE SET
|
||||||
@@ -380,7 +395,7 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert issue
|
// Insert issue
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = conn.ExecContext(ctx, `
|
||||||
INSERT INTO issues (
|
INSERT INTO issues (
|
||||||
id, title, description, design, acceptance_criteria, notes,
|
id, title, description, design, acceptance_criteria, notes,
|
||||||
status, priority, issue_type, assignee, estimated_minutes,
|
status, priority, issue_type, assignee, estimated_minutes,
|
||||||
@@ -404,7 +419,7 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
|
|||||||
eventData = []byte(fmt.Sprintf(`{"id":"%s","title":"%s"}`, issue.ID, issue.Title))
|
eventData = []byte(fmt.Sprintf(`{"id":"%s","title":"%s"}`, issue.ID, issue.Title))
|
||||||
}
|
}
|
||||||
eventDataStr := string(eventData)
|
eventDataStr := string(eventData)
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = conn.ExecContext(ctx, `
|
||||||
INSERT INTO events (issue_id, event_type, actor, new_value)
|
INSERT INTO events (issue_id, event_type, actor, new_value)
|
||||||
VALUES (?, ?, ?, ?)
|
VALUES (?, ?, ?, ?)
|
||||||
`, issue.ID, types.EventCreated, actor, eventDataStr)
|
`, issue.ID, types.EventCreated, actor, eventDataStr)
|
||||||
@@ -413,7 +428,7 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Mark issue as dirty for incremental export
|
// Mark issue as dirty for incremental export
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = conn.ExecContext(ctx, `
|
||||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||||
VALUES (?, ?)
|
VALUES (?, ?)
|
||||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||||
@@ -422,7 +437,12 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
|
|||||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
// Commit the transaction
|
||||||
|
if _, err := conn.ExecContext(ctx, "COMMIT"); err != nil {
|
||||||
|
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||||
|
}
|
||||||
|
committed = true
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetIssue retrieves an issue by ID
|
// GetIssue retrieves an issue by ID
|
||||||
|
|||||||
@@ -421,3 +421,81 @@ func TestGetStatistics(t *testing.T) {
|
|||||||
// parallel load (100+ simultaneous operations). This is a known limitation and
|
// parallel load (100+ simultaneous operations). This is a known limitation and
|
||||||
// does not affect normal usage where WAL mode handles typical concurrent operations.
|
// does not affect normal usage where WAL mode handles typical concurrent operations.
|
||||||
// For very high concurrency needs, consider using CGO-enabled sqlite3 driver or PostgreSQL.
|
// For very high concurrency needs, consider using CGO-enabled sqlite3 driver or PostgreSQL.
|
||||||
|
|
||||||
|
// TestParallelIssueCreation verifies that parallel issue creation doesn't cause ID collisions
|
||||||
|
// This is a regression test for bd-89 (GH-6) where race conditions in ID generation caused
|
||||||
|
// UNIQUE constraint failures when creating issues rapidly in parallel.
|
||||||
|
func TestParallelIssueCreation(t *testing.T) {
|
||||||
|
store, cleanup := setupTestDB(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
const numIssues = 20
|
||||||
|
|
||||||
|
// Create issues in parallel using goroutines
|
||||||
|
errors := make(chan error, numIssues)
|
||||||
|
ids := make(chan string, numIssues)
|
||||||
|
|
||||||
|
for i := 0; i < numIssues; i++ {
|
||||||
|
go func(num int) {
|
||||||
|
issue := &types.Issue{
|
||||||
|
Title: "Parallel test issue",
|
||||||
|
Status: types.StatusOpen,
|
||||||
|
Priority: 2,
|
||||||
|
IssueType: types.TypeTask,
|
||||||
|
}
|
||||||
|
err := store.CreateIssue(ctx, issue, "test-user")
|
||||||
|
if err != nil {
|
||||||
|
errors <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ids <- issue.ID
|
||||||
|
errors <- nil
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect results
|
||||||
|
var collectedIDs []string
|
||||||
|
var failureCount int
|
||||||
|
for i := 0; i < numIssues; i++ {
|
||||||
|
if err := <-errors; err != nil {
|
||||||
|
t.Errorf("CreateIssue failed in parallel test: %v", err)
|
||||||
|
failureCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(ids)
|
||||||
|
for id := range ids {
|
||||||
|
collectedIDs = append(collectedIDs, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify no failures occurred
|
||||||
|
if failureCount > 0 {
|
||||||
|
t.Fatalf("Expected 0 failures, got %d", failureCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we got the expected number of IDs
|
||||||
|
if len(collectedIDs) != numIssues {
|
||||||
|
t.Fatalf("Expected %d IDs, got %d", numIssues, len(collectedIDs))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify all IDs are unique (no duplicates from race conditions)
|
||||||
|
seen := make(map[string]bool)
|
||||||
|
for _, id := range collectedIDs {
|
||||||
|
if seen[id] {
|
||||||
|
t.Errorf("Duplicate ID detected: %s", id)
|
||||||
|
}
|
||||||
|
seen[id] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify all issues can be retrieved (they actually exist in the database)
|
||||||
|
for _, id := range collectedIDs {
|
||||||
|
issue, err := store.GetIssue(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Failed to retrieve issue %s: %v", id, err)
|
||||||
|
}
|
||||||
|
if issue == nil {
|
||||||
|
t.Errorf("Issue %s not found in database", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user