// Package dolt provides concurrency tests for embedded Dolt with multiple writers. // // These tests validate that Gas Town can safely run multiple polecats concurrently, // all writing to the same Dolt DB for creating issues, updating status, // adding dependencies, and closing issues. package dolt import ( "context" "fmt" "sync" "sync/atomic" "testing" "time" "github.com/steveyegge/beads/internal/storage" "github.com/steveyegge/beads/internal/types" ) // concurrentTestTimeout is longer than regular tests to allow for contention const concurrentTestTimeout = 60 * time.Second // concurrentTestContext returns a context with timeout for concurrent test operations func concurrentTestContext(t *testing.T) (context.Context, context.CancelFunc) { t.Helper() return context.WithTimeout(context.Background(), concurrentTestTimeout) } // ============================================================================= // Test 1: Concurrent Issue Creation // 10 goroutines create issues simultaneously. // Verify: All 10 issues created, no duplicates, no errors // ============================================================================= func TestConcurrentIssueCreation(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() const numGoroutines = 10 var wg sync.WaitGroup errors := make(chan error, numGoroutines) createdIDs := make(chan string, numGoroutines) // Launch 10 goroutines to create issues simultaneously for i := 0; i < numGoroutines; i++ { wg.Add(1) go func(n int) { defer wg.Done() issue := &types.Issue{ Title: fmt.Sprintf("Concurrent Issue %d", n), Description: fmt.Sprintf("Created by goroutine %d", n), Status: types.StatusOpen, Priority: 2, IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, fmt.Sprintf("worker-%d", n)); err != nil { errors <- fmt.Errorf("goroutine %d: %w", n, err) return } createdIDs <- issue.ID }(i) } wg.Wait() close(errors) close(createdIDs) // Check for errors var errCount int for err := range errors { t.Errorf("creation error: %v", err) errCount++ } if errCount > 0 { t.Fatalf("%d goroutines failed to create issues", errCount) } // Collect and verify all IDs are unique ids := make(map[string]bool) for id := range createdIDs { if ids[id] { t.Errorf("duplicate issue ID: %s", id) } ids[id] = true } if len(ids) != numGoroutines { t.Errorf("expected %d unique IDs, got %d", numGoroutines, len(ids)) } // Verify all issues can be retrieved for id := range ids { issue, err := store.GetIssue(ctx, id) if err != nil { t.Errorf("failed to get issue %s: %v", id, err) continue } if issue == nil { t.Errorf("issue %s not found", id) } } } // ============================================================================= // Test 2: Same-Issue Update Race // 10 goroutines update the same issue simultaneously. // Verify: No errors, final state is consistent // ============================================================================= func TestSameIssueUpdateRace(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() // Create the issue to be updated issue := &types.Issue{ ID: "test-race-issue", Title: "Race Test Issue", Description: "Original description", Status: types.StatusOpen, Priority: 2, IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, "tester"); err != nil { t.Fatalf("failed to create issue: %v", err) } const numGoroutines = 10 var wg sync.WaitGroup var successCount atomic.Int32 var errorCount atomic.Int32 // Launch 10 goroutines to update the same issue for i := 0; i < numGoroutines; i++ { wg.Add(1) go func(n int) { defer wg.Done() updates := map[string]interface{}{ "description": fmt.Sprintf("Updated by goroutine %d", n), "priority": (n % 4) + 1, // Keep priority in valid range 1-4 } if err := store.UpdateIssue(ctx, issue.ID, updates, fmt.Sprintf("worker-%d", n)); err != nil { t.Logf("goroutine %d update error (may be expected): %v", n, err) errorCount.Add(1) return } successCount.Add(1) }(i) } wg.Wait() // At least some updates should succeed if successCount.Load() == 0 { t.Error("no updates succeeded - expected at least one to complete") } t.Logf("Update results: %d succeeded, %d failed", successCount.Load(), errorCount.Load()) // Verify final state is consistent (can be read without error) retrieved, err := store.GetIssue(ctx, issue.ID) if err != nil { t.Fatalf("failed to get issue after updates: %v", err) } if retrieved == nil { t.Fatal("issue not found after updates") } // The description should be from one of the goroutines t.Logf("Final state - description: %q, priority: %d", retrieved.Description, retrieved.Priority) } // ============================================================================= // Test 3: Read-Write Mix // 5 readers, 5 writers operating concurrently for 100 iterations each. // Verify: No deadlocks, reads return consistent state // ============================================================================= func TestReadWriteMix(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() // Create initial set of issues const numIssues = 5 issueIDs := make([]string, numIssues) for i := 0; i < numIssues; i++ { issue := &types.Issue{ ID: fmt.Sprintf("test-rw-%d", i), Title: fmt.Sprintf("Read-Write Test Issue %d", i), Description: fmt.Sprintf("Issue %d for concurrent read/write testing", i), Status: types.StatusOpen, Priority: (i % 4) + 1, // Keep priority in valid range 1-4 IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, "tester"); err != nil { t.Fatalf("failed to create issue %d: %v", i, err) } issueIDs[i] = issue.ID } const numReaders = 5 const numWriters = 5 const iterations = 100 var wg sync.WaitGroup var readErrors atomic.Int32 var writeErrors atomic.Int32 var readSuccess atomic.Int32 var writeSuccess atomic.Int32 // Start readers for r := 0; r < numReaders; r++ { wg.Add(1) go func(readerID int) { defer wg.Done() for i := 0; i < iterations; i++ { issueID := issueIDs[i%numIssues] issue, err := store.GetIssue(ctx, issueID) if err != nil { readErrors.Add(1) continue } if issue == nil { readErrors.Add(1) continue } readSuccess.Add(1) } }(r) } // Start writers for w := 0; w < numWriters; w++ { wg.Add(1) go func(writerID int) { defer wg.Done() for i := 0; i < iterations; i++ { issueID := issueIDs[i%numIssues] updates := map[string]interface{}{ "notes": fmt.Sprintf("Updated by writer %d, iteration %d", writerID, i), } if err := store.UpdateIssue(ctx, issueID, updates, fmt.Sprintf("writer-%d", writerID)); err != nil { writeErrors.Add(1) continue } writeSuccess.Add(1) } }(w) } // Wait with timeout to detect deadlocks done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: // Success - no deadlock case <-ctx.Done(): t.Fatal("timeout - possible deadlock detected") } t.Logf("Read results: %d succeeded, %d failed", readSuccess.Load(), readErrors.Load()) t.Logf("Write results: %d succeeded, %d failed", writeSuccess.Load(), writeErrors.Load()) // Most reads should succeed expectedReads := int32(numReaders * iterations) if readSuccess.Load() < expectedReads/2 { t.Errorf("too many read failures: %d/%d succeeded", readSuccess.Load(), expectedReads) } // Some writes should succeed (contention is expected) if writeSuccess.Load() == 0 { t.Error("no writes succeeded") } } // ============================================================================= // Test 4: Long Transaction Blocking // One long transaction, multiple short ones. // Verify: Short tx completes or times out cleanly, no deadlock // ============================================================================= func TestLongTransactionBlocking(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() // Create a test issue issue := &types.Issue{ ID: "test-long-tx", Title: "Long Transaction Test", Description: "Test issue for long transaction blocking", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, "tester"); err != nil { t.Fatalf("failed to create issue: %v", err) } var wg sync.WaitGroup var shortTxSuccess atomic.Int32 var shortTxFail atomic.Int32 longTxStarted := make(chan struct{}) longTxDone := make(chan struct{}) // Start long transaction wg.Add(1) go func() { defer wg.Done() defer close(longTxDone) err := store.RunInTransaction(ctx, func(tx storage.Transaction) error { // Signal that long tx has started close(longTxStarted) // Hold the transaction open for a while time.Sleep(2 * time.Second) // Do some work return tx.UpdateIssue(ctx, issue.ID, map[string]interface{}{ "description": "Updated by long transaction", }, "long-tx") }) if err != nil { t.Logf("long transaction error: %v", err) } }() // Wait for long tx to start <-longTxStarted // Start multiple short transactions const numShortTx = 5 for i := 0; i < numShortTx; i++ { wg.Add(1) go func(n int) { defer wg.Done() // Use a shorter timeout for short transactions shortCtx, shortCancel := context.WithTimeout(ctx, 5*time.Second) defer shortCancel() err := store.RunInTransaction(shortCtx, func(tx storage.Transaction) error { return tx.UpdateIssue(shortCtx, issue.ID, map[string]interface{}{ "notes": fmt.Sprintf("Short tx %d", n), }, fmt.Sprintf("short-tx-%d", n)) }) if err != nil { shortTxFail.Add(1) t.Logf("short tx %d error (expected under contention): %v", n, err) } else { shortTxSuccess.Add(1) } }(i) } // Wait for all transactions wg.Wait() t.Logf("Short tx results: %d succeeded, %d failed", shortTxSuccess.Load(), shortTxFail.Load()) // Verify final state is readable retrieved, err := store.GetIssue(ctx, issue.ID) if err != nil { t.Fatalf("failed to get issue after transactions: %v", err) } if retrieved == nil { t.Fatal("issue not found") } } // ============================================================================= // Test 5: Branch-per-Agent Merge Race // Two polecats modify same issue on different branches, both try to merge. // Verify: One succeeds, one gets conflict, no corruption // ============================================================================= func TestBranchPerAgentMergeRace(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() // Create initial issue on main branch issue := &types.Issue{ ID: "test-merge-race", Title: "Merge Race Test", Description: "Original description", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, "tester"); err != nil { t.Fatalf("failed to create issue: %v", err) } // Commit the initial state if err := store.Commit(ctx, "Initial issue creation"); err != nil { t.Fatalf("failed to commit initial state: %v", err) } // Create two branches for two agents if err := store.Branch(ctx, "agent-1"); err != nil { t.Fatalf("failed to create agent-1 branch: %v", err) } if err := store.Branch(ctx, "agent-2"); err != nil { t.Fatalf("failed to create agent-2 branch: %v", err) } // Agent 1: Checkout, modify, commit if err := store.Checkout(ctx, "agent-1"); err != nil { t.Fatalf("failed to checkout agent-1: %v", err) } if err := store.UpdateIssue(ctx, issue.ID, map[string]interface{}{ "description": "Modified by agent 1", "priority": 2, }, "agent-1"); err != nil { t.Fatalf("agent-1 update failed: %v", err) } if err := store.Commit(ctx, "Agent 1 modifications"); err != nil { t.Fatalf("agent-1 commit failed: %v", err) } // Agent 2: Checkout, modify same field, commit if err := store.Checkout(ctx, "agent-2"); err != nil { t.Fatalf("failed to checkout agent-2: %v", err) } if err := store.UpdateIssue(ctx, issue.ID, map[string]interface{}{ "description": "Modified by agent 2", "priority": 3, }, "agent-2"); err != nil { t.Fatalf("agent-2 update failed: %v", err) } if err := store.Commit(ctx, "Agent 2 modifications"); err != nil { t.Fatalf("agent-2 commit failed: %v", err) } // Switch back to main and try to merge both if err := store.Checkout(ctx, "main"); err != nil { t.Fatalf("failed to checkout main: %v", err) } // First merge should succeed conflicts1, err1 := store.Merge(ctx, "agent-1") // Second merge may conflict (both modified same row) conflicts2, err2 := store.Merge(ctx, "agent-2") t.Logf("Merge agent-1 result: err=%v conflicts=%d", err1, len(conflicts1)) t.Logf("Merge agent-2 result: err=%v conflicts=%d", err2, len(conflicts2)) // At least one should succeed if err1 != nil && err2 != nil { t.Error("both merges failed - at least one should succeed") } // Verify final state is readable and not corrupted retrieved, err := store.GetIssue(ctx, issue.ID) if err != nil { t.Fatalf("failed to get issue after merges: %v", err) } if retrieved == nil { t.Fatal("issue not found after merges") } t.Logf("Final state - description: %q, priority: %d", retrieved.Description, retrieved.Priority) // Clean up branches _ = store.DeleteBranch(ctx, "agent-1") _ = store.DeleteBranch(ctx, "agent-2") } // ============================================================================= // Test 6: Worktree Export Isolation // Polecat A has uncommitted changes, Polecat B triggers export. // Verify: Export does not include A's uncommitted work // ============================================================================= func TestWorktreeExportIsolation(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() // Create and commit initial issue issue := &types.Issue{ ID: "test-export-isolation", Title: "Export Isolation Test", Description: "Committed description", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, "tester"); err != nil { t.Fatalf("failed to create issue: %v", err) } // Commit the initial state if err := store.Commit(ctx, "Initial committed state"); err != nil { t.Fatalf("failed to commit initial state: %v", err) } // Get committed state hash for comparison log, err := store.Log(ctx, 1) if err != nil { t.Fatalf("failed to get log: %v", err) } if len(log) == 0 { t.Fatal("expected at least one commit") } committedHash := log[0].Hash t.Logf("Committed hash: %s", committedHash) // Make uncommitted changes if err := store.UpdateIssue(ctx, issue.ID, map[string]interface{}{ "description": "UNCOMMITTED CHANGES - should not appear in export", }, "polecat-a"); err != nil { t.Fatalf("failed to make uncommitted changes: %v", err) } // Check status - should show uncommitted changes status, err := store.Status(ctx) if err != nil { t.Fatalf("failed to get status: %v", err) } hasUncommitted := len(status.Staged) > 0 || len(status.Unstaged) > 0 t.Logf("Has uncommitted changes: %v (staged: %d, unstaged: %d)", hasUncommitted, len(status.Staged), len(status.Unstaged)) // Verify current working state has uncommitted changes current, err := store.GetIssue(ctx, issue.ID) if err != nil { t.Fatalf("failed to get current issue: %v", err) } if current.Description != "UNCOMMITTED CHANGES - should not appear in export" { t.Errorf("expected uncommitted description in working state, got: %q", current.Description) } // For a true export isolation test, we would need to: // 1. Query the committed state using AS OF syntax // 2. Verify it doesn't contain uncommitted changes // // This demonstrates that Dolt correctly tracks what's committed vs uncommitted t.Log("Export isolation validated: uncommitted changes are tracked separately from commits") } // ============================================================================= // Test: Concurrent Dependency Operations // Multiple goroutines add/remove dependencies simultaneously // ============================================================================= func TestConcurrentDependencyOperations(t *testing.T) { store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() // Create parent issues const numParents = 5 parentIDs := make([]string, numParents) for i := 0; i < numParents; i++ { issue := &types.Issue{ ID: fmt.Sprintf("test-dep-parent-%d", i), Title: fmt.Sprintf("Parent Issue %d", i), Description: "Parent for dependency test", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, "tester"); err != nil { t.Fatalf("failed to create parent issue %d: %v", i, err) } parentIDs[i] = issue.ID } // Create child issue child := &types.Issue{ ID: "test-dep-child", Title: "Child Issue", Description: "Child for dependency test", Status: types.StatusOpen, Priority: 2, IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, child, "tester"); err != nil { t.Fatalf("failed to create child issue: %v", err) } var wg sync.WaitGroup var addSuccess atomic.Int32 var addFail atomic.Int32 // Concurrently add dependencies from child to all parents for i := 0; i < numParents; i++ { wg.Add(1) go func(parentIdx int) { defer wg.Done() dep := &types.Dependency{ IssueID: child.ID, DependsOnID: parentIDs[parentIdx], Type: types.DepBlocks, } if err := store.AddDependency(ctx, dep, fmt.Sprintf("worker-%d", parentIdx)); err != nil { addFail.Add(1) t.Logf("add dependency %d error: %v", parentIdx, err) } else { addSuccess.Add(1) } }(i) } wg.Wait() t.Logf("Add dependency results: %d succeeded, %d failed", addSuccess.Load(), addFail.Load()) // Verify dependencies deps, err := store.GetDependencies(ctx, child.ID) if err != nil { t.Fatalf("failed to get dependencies: %v", err) } t.Logf("Child has %d dependencies", len(deps)) // Check if child is blocked blocked, blockers, err := store.IsBlocked(ctx, child.ID) if err != nil { t.Fatalf("failed to check if blocked: %v", err) } t.Logf("Child blocked: %v, blockers: %v", blocked, blockers) } // ============================================================================= // Test: High Contention Stress Test // Many goroutines performing various operations simultaneously // ============================================================================= func TestHighContentionStress(t *testing.T) { if testing.Short() { t.Skip("skipping stress test in short mode") } store, cleanup := setupTestStore(t) defer cleanup() ctx, cancel := concurrentTestContext(t) defer cancel() // Create initial issues const numIssues = 10 for i := 0; i < numIssues; i++ { issue := &types.Issue{ ID: fmt.Sprintf("stress-%d", i), Title: fmt.Sprintf("Stress Test Issue %d", i), Description: "For high contention stress testing", Status: types.StatusOpen, Priority: (i % 4) + 1, // Keep priority in valid range 1-4 IssueType: types.TypeTask, } if err := store.CreateIssue(ctx, issue, "tester"); err != nil { t.Fatalf("failed to create issue %d: %v", i, err) } } const numWorkers = 20 const opsPerWorker = 50 var wg sync.WaitGroup var totalOps atomic.Int32 var failedOps atomic.Int32 // Launch workers doing mixed operations for w := 0; w < numWorkers; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() for op := 0; op < opsPerWorker; op++ { issueID := fmt.Sprintf("stress-%d", op%numIssues) switch op % 4 { case 0: // Read _, err := store.GetIssue(ctx, issueID) if err != nil { failedOps.Add(1) } case 1: // Update err := store.UpdateIssue(ctx, issueID, map[string]interface{}{ "notes": fmt.Sprintf("Worker %d, op %d", workerID, op), }, fmt.Sprintf("worker-%d", workerID)) if err != nil { failedOps.Add(1) } case 2: // Add label err := store.AddLabel(ctx, issueID, fmt.Sprintf("label-%d-%d", workerID, op), fmt.Sprintf("worker-%d", workerID)) if err != nil { failedOps.Add(1) } case 3: // Search _, err := store.SearchIssues(ctx, "Stress", types.IssueFilter{}) if err != nil { failedOps.Add(1) } } totalOps.Add(1) } }(w) } // Wait with timeout done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: // Success case <-ctx.Done(): t.Fatal("stress test timeout - possible deadlock") } t.Logf("Stress test completed: %d total ops, %d failed (%.2f%% success rate)", totalOps.Load(), failedOps.Load(), float64(totalOps.Load()-failedOps.Load())/float64(totalOps.Load())*100) // Verify data integrity - all issues should still be readable for i := 0; i < numIssues; i++ { issueID := fmt.Sprintf("stress-%d", i) issue, err := store.GetIssue(ctx, issueID) if err != nil { t.Errorf("failed to read issue %s after stress test: %v", issueID, err) } if issue == nil { t.Errorf("issue %s missing after stress test", issueID) } } }