From 6087cd438b3246b3eca79efd2e43114899901cde Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Thu, 18 Dec 2025 11:27:58 -0800 Subject: [PATCH] fix(storage): race condition when reconnect closes db mid-query (GH#607) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change reconnectMu from sync.Mutex to sync.RWMutex so read operations can hold RLock during database access. This prevents reconnect() from closing the connection while queries are in progress. - GetIssue and SearchIssues now hold RLock during database operations - Close() acquires write lock to coordinate with reconnect - Add TestConcurrentReadsWithReconnect to verify the fix 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/storage/sqlite/freshness_test.go | 141 ++++++++++++++++++++++ internal/storage/sqlite/queries.go | 10 ++ internal/storage/sqlite/store.go | 5 +- 3 files changed, 155 insertions(+), 1 deletion(-) diff --git a/internal/storage/sqlite/freshness_test.go b/internal/storage/sqlite/freshness_test.go index 2cf604f2..f723ffe1 100644 --- a/internal/storage/sqlite/freshness_test.go +++ b/internal/storage/sqlite/freshness_test.go @@ -5,6 +5,7 @@ import ( "database/sql" "os" "path/filepath" + "sync" "syscall" "testing" "time" @@ -810,6 +811,146 @@ func TestBranchMergeNoErroneousDeletion(t *testing.T) { } } +// TestConcurrentReadsWithReconnect verifies the race condition fix from GH#607. +// The race condition was: +// 1. Operation A calls checkFreshness() → no change → proceeds to use s.db +// 2. Operation B calls checkFreshness() → detects change → calls reconnect() +// 3. reconnect() closes s.db while Operation A is still using it +// 4. Operation A fails with "database is closed" +// +// The fix uses sync.RWMutex: +// - Read operations hold RLock during database access +// - reconnect() holds exclusive Lock, waiting for readers to finish +func TestConcurrentReadsWithReconnect(t *testing.T) { + // Create temp directory + tmpDir, err := os.MkdirTemp("", "beads-concurrent-reconnect-*") + if err != nil { + t.Fatalf("failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + dbPath := filepath.Join(tmpDir, "test.db") + ctx := context.Background() + + // Create store + store, err := New(ctx, dbPath) + if err != nil { + t.Fatalf("failed to create store: %v", err) + } + + // Initialize + if err := store.SetConfig(ctx, "issue_prefix", "bd"); err != nil { + store.Close() + t.Fatalf("failed to set issue_prefix: %v", err) + } + + // Create an issue to query + issue := &types.Issue{ + Title: "Concurrent Test Issue", + Status: types.StatusOpen, + Priority: 2, + IssueType: types.TypeTask, + } + if err := store.CreateIssue(ctx, issue, "test"); err != nil { + store.Close() + t.Fatalf("failed to create issue: %v", err) + } + issueID := issue.ID + + // Enable freshness checking + store.EnableFreshnessChecking() + + // Track errors from concurrent operations + const numGoroutines = 50 + const opsPerGoroutine = 100 + errChan := make(chan error, numGoroutines*opsPerGoroutine) + doneChan := make(chan struct{}) + var wg sync.WaitGroup + + // Start goroutines that continuously call GetIssue + for i := range numGoroutines { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + for j := range opsPerGoroutine { + select { + case <-doneChan: + return + default: + } + + _, err := store.GetIssue(ctx, issueID) + if err != nil { + errChan <- err + } + + // Occasionally trigger reconnect by touching the file + // This simulates external modifications + if j%20 == 0 && goroutineID == 0 { + // Touch the file to change mtime + now := time.Now() + os.Chtimes(dbPath, now, now) + } + } + }(i) + } + + // Also start a goroutine that forces reconnections + wg.Add(1) + go func() { + defer wg.Done() + for i := range 20 { + select { + case <-doneChan: + return + default: + } + + // Force reconnection by calling it directly + // This simulates what happens when freshness check detects changes + _ = store.reconnect() + + // Small delay between reconnections + time.Sleep(time.Duration(10+i) * time.Millisecond) + } + }() + + // Wait for all operations to complete (max 2 seconds) + time.Sleep(2 * time.Second) + close(doneChan) + + // Wait for all goroutines to finish + wg.Wait() + + // Now safe to close store and error channel + store.Close() + close(errChan) + + // Count errors + var dbClosedErrors int + var otherErrors int + for err := range errChan { + errStr := err.Error() + if errStr == "sql: database is closed" || + errStr == "database is closed" || + errStr == "sql: statement is closed" { + dbClosedErrors++ + } else { + otherErrors++ + t.Logf("Other error: %v", err) + } + } + + if dbClosedErrors > 0 { + t.Errorf("Race condition detected: %d 'database is closed' errors occurred (GH#607 not fixed)", dbClosedErrors) + } + if otherErrors > 0 { + t.Logf("Note: %d non-database-closed errors occurred (may be expected)", otherErrors) + } + t.Logf("Completed %d goroutines × %d ops with %d db closed errors, %d other errors", + numGoroutines, opsPerGoroutine, dbClosedErrors, otherErrors) +} + // BenchmarkGetIssueWithFreshness measures GetIssue with freshness checking enabled. func BenchmarkGetIssueWithFreshness(b *testing.B) { // Create temp directory diff --git a/internal/storage/sqlite/queries.go b/internal/storage/sqlite/queries.go index 6b903447..5939d379 100644 --- a/internal/storage/sqlite/queries.go +++ b/internal/storage/sqlite/queries.go @@ -227,6 +227,11 @@ func (s *SQLiteStorage) GetIssue(ctx context.Context, id string) (*types.Issue, // Check for external database file modifications (daemon mode) s.checkFreshness() + // Hold read lock during database operations to prevent reconnect() from + // closing the connection mid-query (GH#607 race condition fix) + s.reconnectMu.RLock() + defer s.reconnectMu.RUnlock() + var issue types.Issue var closedAt sql.NullTime var estimatedMinutes sql.NullInt64 @@ -1400,6 +1405,11 @@ func (s *SQLiteStorage) SearchIssues(ctx context.Context, query string, filter t // Check for external database file modifications (daemon mode) s.checkFreshness() + // Hold read lock during database operations to prevent reconnect() from + // closing the connection mid-query (GH#607 race condition fix) + s.reconnectMu.RLock() + defer s.reconnectMu.RUnlock() + whereClauses := []string{} args := []interface{}{} diff --git a/internal/storage/sqlite/store.go b/internal/storage/sqlite/store.go index b56d428d..708519f0 100644 --- a/internal/storage/sqlite/store.go +++ b/internal/storage/sqlite/store.go @@ -28,7 +28,7 @@ type SQLiteStorage struct { connStr string // Connection string for reconnection busyTimeout time.Duration freshness *FreshnessChecker // Optional freshness checker for daemon mode - reconnectMu sync.Mutex // Protects reconnection + reconnectMu sync.RWMutex // Protects reconnection and db access (GH#607) } // setupWASMCache configures WASM compilation caching to reduce SQLite startup time. @@ -207,6 +207,9 @@ func NewWithTimeout(ctx context.Context, path string, busyTimeout time.Duration) // It checkpoints the WAL to ensure all writes are flushed to the main database file. func (s *SQLiteStorage) Close() error { s.closed.Store(true) + // Acquire write lock to prevent racing with reconnect() (GH#607) + s.reconnectMu.Lock() + defer s.reconnectMu.Unlock() // Checkpoint WAL to ensure all writes are persisted to the main database file. // Without this, writes may be stranded in the WAL and lost between CLI invocations. _, _ = s.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)")