fix(storage): race condition when reconnect closes db mid-query (GH#607)
Change reconnectMu from sync.Mutex to sync.RWMutex so read operations can hold RLock during database access. This prevents reconnect() from closing the connection while queries are in progress. - GetIssue and SearchIssues now hold RLock during database operations - Close() acquires write lock to coordinate with reconnect - Add TestConcurrentReadsWithReconnect to verify the fix 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -810,6 +811,146 @@ func TestBranchMergeNoErroneousDeletion(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestConcurrentReadsWithReconnect verifies the race condition fix from GH#607.
|
||||
// The race condition was:
|
||||
// 1. Operation A calls checkFreshness() → no change → proceeds to use s.db
|
||||
// 2. Operation B calls checkFreshness() → detects change → calls reconnect()
|
||||
// 3. reconnect() closes s.db while Operation A is still using it
|
||||
// 4. Operation A fails with "database is closed"
|
||||
//
|
||||
// The fix uses sync.RWMutex:
|
||||
// - Read operations hold RLock during database access
|
||||
// - reconnect() holds exclusive Lock, waiting for readers to finish
|
||||
func TestConcurrentReadsWithReconnect(t *testing.T) {
|
||||
// Create temp directory
|
||||
tmpDir, err := os.MkdirTemp("", "beads-concurrent-reconnect-*")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temp dir: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
dbPath := filepath.Join(tmpDir, "test.db")
|
||||
ctx := context.Background()
|
||||
|
||||
// Create store
|
||||
store, err := New(ctx, dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create store: %v", err)
|
||||
}
|
||||
|
||||
// Initialize
|
||||
if err := store.SetConfig(ctx, "issue_prefix", "bd"); err != nil {
|
||||
store.Close()
|
||||
t.Fatalf("failed to set issue_prefix: %v", err)
|
||||
}
|
||||
|
||||
// Create an issue to query
|
||||
issue := &types.Issue{
|
||||
Title: "Concurrent Test Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := store.CreateIssue(ctx, issue, "test"); err != nil {
|
||||
store.Close()
|
||||
t.Fatalf("failed to create issue: %v", err)
|
||||
}
|
||||
issueID := issue.ID
|
||||
|
||||
// Enable freshness checking
|
||||
store.EnableFreshnessChecking()
|
||||
|
||||
// Track errors from concurrent operations
|
||||
const numGoroutines = 50
|
||||
const opsPerGoroutine = 100
|
||||
errChan := make(chan error, numGoroutines*opsPerGoroutine)
|
||||
doneChan := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start goroutines that continuously call GetIssue
|
||||
for i := range numGoroutines {
|
||||
wg.Add(1)
|
||||
go func(goroutineID int) {
|
||||
defer wg.Done()
|
||||
for j := range opsPerGoroutine {
|
||||
select {
|
||||
case <-doneChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
_, err := store.GetIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
// Occasionally trigger reconnect by touching the file
|
||||
// This simulates external modifications
|
||||
if j%20 == 0 && goroutineID == 0 {
|
||||
// Touch the file to change mtime
|
||||
now := time.Now()
|
||||
os.Chtimes(dbPath, now, now)
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Also start a goroutine that forces reconnections
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := range 20 {
|
||||
select {
|
||||
case <-doneChan:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Force reconnection by calling it directly
|
||||
// This simulates what happens when freshness check detects changes
|
||||
_ = store.reconnect()
|
||||
|
||||
// Small delay between reconnections
|
||||
time.Sleep(time.Duration(10+i) * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for all operations to complete (max 2 seconds)
|
||||
time.Sleep(2 * time.Second)
|
||||
close(doneChan)
|
||||
|
||||
// Wait for all goroutines to finish
|
||||
wg.Wait()
|
||||
|
||||
// Now safe to close store and error channel
|
||||
store.Close()
|
||||
close(errChan)
|
||||
|
||||
// Count errors
|
||||
var dbClosedErrors int
|
||||
var otherErrors int
|
||||
for err := range errChan {
|
||||
errStr := err.Error()
|
||||
if errStr == "sql: database is closed" ||
|
||||
errStr == "database is closed" ||
|
||||
errStr == "sql: statement is closed" {
|
||||
dbClosedErrors++
|
||||
} else {
|
||||
otherErrors++
|
||||
t.Logf("Other error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if dbClosedErrors > 0 {
|
||||
t.Errorf("Race condition detected: %d 'database is closed' errors occurred (GH#607 not fixed)", dbClosedErrors)
|
||||
}
|
||||
if otherErrors > 0 {
|
||||
t.Logf("Note: %d non-database-closed errors occurred (may be expected)", otherErrors)
|
||||
}
|
||||
t.Logf("Completed %d goroutines × %d ops with %d db closed errors, %d other errors",
|
||||
numGoroutines, opsPerGoroutine, dbClosedErrors, otherErrors)
|
||||
}
|
||||
|
||||
// BenchmarkGetIssueWithFreshness measures GetIssue with freshness checking enabled.
|
||||
func BenchmarkGetIssueWithFreshness(b *testing.B) {
|
||||
// Create temp directory
|
||||
|
||||
@@ -227,6 +227,11 @@ func (s *SQLiteStorage) GetIssue(ctx context.Context, id string) (*types.Issue,
|
||||
// Check for external database file modifications (daemon mode)
|
||||
s.checkFreshness()
|
||||
|
||||
// Hold read lock during database operations to prevent reconnect() from
|
||||
// closing the connection mid-query (GH#607 race condition fix)
|
||||
s.reconnectMu.RLock()
|
||||
defer s.reconnectMu.RUnlock()
|
||||
|
||||
var issue types.Issue
|
||||
var closedAt sql.NullTime
|
||||
var estimatedMinutes sql.NullInt64
|
||||
@@ -1400,6 +1405,11 @@ func (s *SQLiteStorage) SearchIssues(ctx context.Context, query string, filter t
|
||||
// Check for external database file modifications (daemon mode)
|
||||
s.checkFreshness()
|
||||
|
||||
// Hold read lock during database operations to prevent reconnect() from
|
||||
// closing the connection mid-query (GH#607 race condition fix)
|
||||
s.reconnectMu.RLock()
|
||||
defer s.reconnectMu.RUnlock()
|
||||
|
||||
whereClauses := []string{}
|
||||
args := []interface{}{}
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ type SQLiteStorage struct {
|
||||
connStr string // Connection string for reconnection
|
||||
busyTimeout time.Duration
|
||||
freshness *FreshnessChecker // Optional freshness checker for daemon mode
|
||||
reconnectMu sync.Mutex // Protects reconnection
|
||||
reconnectMu sync.RWMutex // Protects reconnection and db access (GH#607)
|
||||
}
|
||||
|
||||
// setupWASMCache configures WASM compilation caching to reduce SQLite startup time.
|
||||
@@ -207,6 +207,9 @@ func NewWithTimeout(ctx context.Context, path string, busyTimeout time.Duration)
|
||||
// It checkpoints the WAL to ensure all writes are flushed to the main database file.
|
||||
func (s *SQLiteStorage) Close() error {
|
||||
s.closed.Store(true)
|
||||
// Acquire write lock to prevent racing with reconnect() (GH#607)
|
||||
s.reconnectMu.Lock()
|
||||
defer s.reconnectMu.Unlock()
|
||||
// Checkpoint WAL to ensure all writes are persisted to the main database file.
|
||||
// Without this, writes may be stranded in the WAL and lost between CLI invocations.
|
||||
_, _ = s.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)")
|
||||
|
||||
Reference in New Issue
Block a user