Files
beads/internal/storage/dolt/concurrent_test.go
Subhrajit Makur 065ca3d6af fix(config): remove duplicate declarations and fix test failures (#1160)
* fix(config): remove duplicate declarations between config.go and sync.go

Commit e82e15a8 created sync.go with typed constants (SyncMode,
ConflictStrategy, Sovereignty) but didn't remove the original untyped
constants from config.go that were added in 16f8c3d3. This caused
redeclaration errors preventing the project from building.

Changes:
- Remove duplicate SyncMode, ConflictStrategy, Sovereignty constants
  from config.go (keep typed versions in sync.go)
- Remove duplicate GetSyncMode, GetConflictStrategy, GetSovereignty
  functions from config.go (keep sync.go versions with warnings)
- Update SyncConfig, ConflictConfig, FederationConfig structs to use
  typed fields instead of string
- Add IsSyncModeValid, IsConflictStrategyValid, IsSovereigntyValid
  wrapper functions that use sync.go's validation maps
- Update cmd/bd/sync.go to use typed ConflictStrategy parameter
- Update tests to work with typed constants

* fix(dolt): handle Merge return values in concurrent test

* fix(test): add --repo flag to show_test.go to bypass auto-routing

The tests were failing because the create command was routing issues
to ~/.beads-planning instead of the test's temp directory. Adding
--repo . overrides auto-routing and creates issues in the test dir.
2026-01-19 10:11:14 -08:00

757 lines
21 KiB
Go

// Package dolt provides concurrency tests for embedded Dolt with multiple writers.
//
// These tests validate that Gas Town can safely run multiple polecats concurrently,
// all writing to the same Dolt DB for creating issues, updating status,
// adding dependencies, and closing issues.
package dolt
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/steveyegge/beads/internal/storage"
"github.com/steveyegge/beads/internal/types"
)
// concurrentTestTimeout is longer than regular tests to allow for contention
const concurrentTestTimeout = 60 * time.Second
// concurrentTestContext returns a context with timeout for concurrent test operations
func concurrentTestContext(t *testing.T) (context.Context, context.CancelFunc) {
t.Helper()
return context.WithTimeout(context.Background(), concurrentTestTimeout)
}
// =============================================================================
// Test 1: Concurrent Issue Creation
// 10 goroutines create issues simultaneously.
// Verify: All 10 issues created, no duplicates, no errors
// =============================================================================
func TestConcurrentIssueCreation(t *testing.T) {
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
const numGoroutines = 10
var wg sync.WaitGroup
errors := make(chan error, numGoroutines)
createdIDs := make(chan string, numGoroutines)
// Launch 10 goroutines to create issues simultaneously
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
issue := &types.Issue{
Title: fmt.Sprintf("Concurrent Issue %d", n),
Description: fmt.Sprintf("Created by goroutine %d", n),
Status: types.StatusOpen,
Priority: 2,
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, fmt.Sprintf("worker-%d", n)); err != nil {
errors <- fmt.Errorf("goroutine %d: %w", n, err)
return
}
createdIDs <- issue.ID
}(i)
}
wg.Wait()
close(errors)
close(createdIDs)
// Check for errors
var errCount int
for err := range errors {
t.Errorf("creation error: %v", err)
errCount++
}
if errCount > 0 {
t.Fatalf("%d goroutines failed to create issues", errCount)
}
// Collect and verify all IDs are unique
ids := make(map[string]bool)
for id := range createdIDs {
if ids[id] {
t.Errorf("duplicate issue ID: %s", id)
}
ids[id] = true
}
if len(ids) != numGoroutines {
t.Errorf("expected %d unique IDs, got %d", numGoroutines, len(ids))
}
// Verify all issues can be retrieved
for id := range ids {
issue, err := store.GetIssue(ctx, id)
if err != nil {
t.Errorf("failed to get issue %s: %v", id, err)
continue
}
if issue == nil {
t.Errorf("issue %s not found", id)
}
}
}
// =============================================================================
// Test 2: Same-Issue Update Race
// 10 goroutines update the same issue simultaneously.
// Verify: No errors, final state is consistent
// =============================================================================
func TestSameIssueUpdateRace(t *testing.T) {
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
// Create the issue to be updated
issue := &types.Issue{
ID: "test-race-issue",
Title: "Race Test Issue",
Description: "Original description",
Status: types.StatusOpen,
Priority: 2,
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, "tester"); err != nil {
t.Fatalf("failed to create issue: %v", err)
}
const numGoroutines = 10
var wg sync.WaitGroup
var successCount atomic.Int32
var errorCount atomic.Int32
// Launch 10 goroutines to update the same issue
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
updates := map[string]interface{}{
"description": fmt.Sprintf("Updated by goroutine %d", n),
"priority": (n % 4) + 1, // Keep priority in valid range 1-4
}
if err := store.UpdateIssue(ctx, issue.ID, updates, fmt.Sprintf("worker-%d", n)); err != nil {
t.Logf("goroutine %d update error (may be expected): %v", n, err)
errorCount.Add(1)
return
}
successCount.Add(1)
}(i)
}
wg.Wait()
// At least some updates should succeed
if successCount.Load() == 0 {
t.Error("no updates succeeded - expected at least one to complete")
}
t.Logf("Update results: %d succeeded, %d failed", successCount.Load(), errorCount.Load())
// Verify final state is consistent (can be read without error)
retrieved, err := store.GetIssue(ctx, issue.ID)
if err != nil {
t.Fatalf("failed to get issue after updates: %v", err)
}
if retrieved == nil {
t.Fatal("issue not found after updates")
}
// The description should be from one of the goroutines
t.Logf("Final state - description: %q, priority: %d", retrieved.Description, retrieved.Priority)
}
// =============================================================================
// Test 3: Read-Write Mix
// 5 readers, 5 writers operating concurrently for 100 iterations each.
// Verify: No deadlocks, reads return consistent state
// =============================================================================
func TestReadWriteMix(t *testing.T) {
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
// Create initial set of issues
const numIssues = 5
issueIDs := make([]string, numIssues)
for i := 0; i < numIssues; i++ {
issue := &types.Issue{
ID: fmt.Sprintf("test-rw-%d", i),
Title: fmt.Sprintf("Read-Write Test Issue %d", i),
Description: fmt.Sprintf("Issue %d for concurrent read/write testing", i),
Status: types.StatusOpen,
Priority: (i % 4) + 1, // Keep priority in valid range 1-4
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, "tester"); err != nil {
t.Fatalf("failed to create issue %d: %v", i, err)
}
issueIDs[i] = issue.ID
}
const numReaders = 5
const numWriters = 5
const iterations = 100
var wg sync.WaitGroup
var readErrors atomic.Int32
var writeErrors atomic.Int32
var readSuccess atomic.Int32
var writeSuccess atomic.Int32
// Start readers
for r := 0; r < numReaders; r++ {
wg.Add(1)
go func(readerID int) {
defer wg.Done()
for i := 0; i < iterations; i++ {
issueID := issueIDs[i%numIssues]
issue, err := store.GetIssue(ctx, issueID)
if err != nil {
readErrors.Add(1)
continue
}
if issue == nil {
readErrors.Add(1)
continue
}
readSuccess.Add(1)
}
}(r)
}
// Start writers
for w := 0; w < numWriters; w++ {
wg.Add(1)
go func(writerID int) {
defer wg.Done()
for i := 0; i < iterations; i++ {
issueID := issueIDs[i%numIssues]
updates := map[string]interface{}{
"notes": fmt.Sprintf("Updated by writer %d, iteration %d", writerID, i),
}
if err := store.UpdateIssue(ctx, issueID, updates, fmt.Sprintf("writer-%d", writerID)); err != nil {
writeErrors.Add(1)
continue
}
writeSuccess.Add(1)
}
}(w)
}
// Wait with timeout to detect deadlocks
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Success - no deadlock
case <-ctx.Done():
t.Fatal("timeout - possible deadlock detected")
}
t.Logf("Read results: %d succeeded, %d failed", readSuccess.Load(), readErrors.Load())
t.Logf("Write results: %d succeeded, %d failed", writeSuccess.Load(), writeErrors.Load())
// Most reads should succeed
expectedReads := int32(numReaders * iterations)
if readSuccess.Load() < expectedReads/2 {
t.Errorf("too many read failures: %d/%d succeeded", readSuccess.Load(), expectedReads)
}
// Some writes should succeed (contention is expected)
if writeSuccess.Load() == 0 {
t.Error("no writes succeeded")
}
}
// =============================================================================
// Test 4: Long Transaction Blocking
// One long transaction, multiple short ones.
// Verify: Short tx completes or times out cleanly, no deadlock
// =============================================================================
func TestLongTransactionBlocking(t *testing.T) {
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
// Create a test issue
issue := &types.Issue{
ID: "test-long-tx",
Title: "Long Transaction Test",
Description: "Test issue for long transaction blocking",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, "tester"); err != nil {
t.Fatalf("failed to create issue: %v", err)
}
var wg sync.WaitGroup
var shortTxSuccess atomic.Int32
var shortTxFail atomic.Int32
longTxStarted := make(chan struct{})
longTxDone := make(chan struct{})
// Start long transaction
wg.Add(1)
go func() {
defer wg.Done()
defer close(longTxDone)
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
// Signal that long tx has started
close(longTxStarted)
// Hold the transaction open for a while
time.Sleep(2 * time.Second)
// Do some work
return tx.UpdateIssue(ctx, issue.ID, map[string]interface{}{
"description": "Updated by long transaction",
}, "long-tx")
})
if err != nil {
t.Logf("long transaction error: %v", err)
}
}()
// Wait for long tx to start
<-longTxStarted
// Start multiple short transactions
const numShortTx = 5
for i := 0; i < numShortTx; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
// Use a shorter timeout for short transactions
shortCtx, shortCancel := context.WithTimeout(ctx, 5*time.Second)
defer shortCancel()
err := store.RunInTransaction(shortCtx, func(tx storage.Transaction) error {
return tx.UpdateIssue(shortCtx, issue.ID, map[string]interface{}{
"notes": fmt.Sprintf("Short tx %d", n),
}, fmt.Sprintf("short-tx-%d", n))
})
if err != nil {
shortTxFail.Add(1)
t.Logf("short tx %d error (expected under contention): %v", n, err)
} else {
shortTxSuccess.Add(1)
}
}(i)
}
// Wait for all transactions
wg.Wait()
t.Logf("Short tx results: %d succeeded, %d failed", shortTxSuccess.Load(), shortTxFail.Load())
// Verify final state is readable
retrieved, err := store.GetIssue(ctx, issue.ID)
if err != nil {
t.Fatalf("failed to get issue after transactions: %v", err)
}
if retrieved == nil {
t.Fatal("issue not found")
}
}
// =============================================================================
// Test 5: Branch-per-Agent Merge Race
// Two polecats modify same issue on different branches, both try to merge.
// Verify: One succeeds, one gets conflict, no corruption
// =============================================================================
func TestBranchPerAgentMergeRace(t *testing.T) {
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
// Create initial issue on main branch
issue := &types.Issue{
ID: "test-merge-race",
Title: "Merge Race Test",
Description: "Original description",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, "tester"); err != nil {
t.Fatalf("failed to create issue: %v", err)
}
// Commit the initial state
if err := store.Commit(ctx, "Initial issue creation"); err != nil {
t.Fatalf("failed to commit initial state: %v", err)
}
// Create two branches for two agents
if err := store.Branch(ctx, "agent-1"); err != nil {
t.Fatalf("failed to create agent-1 branch: %v", err)
}
if err := store.Branch(ctx, "agent-2"); err != nil {
t.Fatalf("failed to create agent-2 branch: %v", err)
}
// Agent 1: Checkout, modify, commit
if err := store.Checkout(ctx, "agent-1"); err != nil {
t.Fatalf("failed to checkout agent-1: %v", err)
}
if err := store.UpdateIssue(ctx, issue.ID, map[string]interface{}{
"description": "Modified by agent 1",
"priority": 2,
}, "agent-1"); err != nil {
t.Fatalf("agent-1 update failed: %v", err)
}
if err := store.Commit(ctx, "Agent 1 modifications"); err != nil {
t.Fatalf("agent-1 commit failed: %v", err)
}
// Agent 2: Checkout, modify same field, commit
if err := store.Checkout(ctx, "agent-2"); err != nil {
t.Fatalf("failed to checkout agent-2: %v", err)
}
if err := store.UpdateIssue(ctx, issue.ID, map[string]interface{}{
"description": "Modified by agent 2",
"priority": 3,
}, "agent-2"); err != nil {
t.Fatalf("agent-2 update failed: %v", err)
}
if err := store.Commit(ctx, "Agent 2 modifications"); err != nil {
t.Fatalf("agent-2 commit failed: %v", err)
}
// Switch back to main and try to merge both
if err := store.Checkout(ctx, "main"); err != nil {
t.Fatalf("failed to checkout main: %v", err)
}
// First merge should succeed
_, err1 := store.Merge(ctx, "agent-1")
// Second merge may conflict (both modified same row)
_, err2 := store.Merge(ctx, "agent-2")
t.Logf("Merge agent-1 result: %v", err1)
t.Logf("Merge agent-2 result: %v", err2)
// At least one should succeed
if err1 != nil && err2 != nil {
t.Error("both merges failed - at least one should succeed")
}
// Verify final state is readable and not corrupted
retrieved, err := store.GetIssue(ctx, issue.ID)
if err != nil {
t.Fatalf("failed to get issue after merges: %v", err)
}
if retrieved == nil {
t.Fatal("issue not found after merges")
}
t.Logf("Final state - description: %q, priority: %d", retrieved.Description, retrieved.Priority)
// Clean up branches
_ = store.DeleteBranch(ctx, "agent-1")
_ = store.DeleteBranch(ctx, "agent-2")
}
// =============================================================================
// Test 6: Worktree Export Isolation
// Polecat A has uncommitted changes, Polecat B triggers export.
// Verify: Export does not include A's uncommitted work
// =============================================================================
func TestWorktreeExportIsolation(t *testing.T) {
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
// Create and commit initial issue
issue := &types.Issue{
ID: "test-export-isolation",
Title: "Export Isolation Test",
Description: "Committed description",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, "tester"); err != nil {
t.Fatalf("failed to create issue: %v", err)
}
// Commit the initial state
if err := store.Commit(ctx, "Initial committed state"); err != nil {
t.Fatalf("failed to commit initial state: %v", err)
}
// Get committed state hash for comparison
log, err := store.Log(ctx, 1)
if err != nil {
t.Fatalf("failed to get log: %v", err)
}
if len(log) == 0 {
t.Fatal("expected at least one commit")
}
committedHash := log[0].Hash
t.Logf("Committed hash: %s", committedHash)
// Make uncommitted changes
if err := store.UpdateIssue(ctx, issue.ID, map[string]interface{}{
"description": "UNCOMMITTED CHANGES - should not appear in export",
}, "polecat-a"); err != nil {
t.Fatalf("failed to make uncommitted changes: %v", err)
}
// Check status - should show uncommitted changes
status, err := store.Status(ctx)
if err != nil {
t.Fatalf("failed to get status: %v", err)
}
hasUncommitted := len(status.Staged) > 0 || len(status.Unstaged) > 0
t.Logf("Has uncommitted changes: %v (staged: %d, unstaged: %d)",
hasUncommitted, len(status.Staged), len(status.Unstaged))
// Verify current working state has uncommitted changes
current, err := store.GetIssue(ctx, issue.ID)
if err != nil {
t.Fatalf("failed to get current issue: %v", err)
}
if current.Description != "UNCOMMITTED CHANGES - should not appear in export" {
t.Errorf("expected uncommitted description in working state, got: %q", current.Description)
}
// For a true export isolation test, we would need to:
// 1. Query the committed state using AS OF syntax
// 2. Verify it doesn't contain uncommitted changes
//
// This demonstrates that Dolt correctly tracks what's committed vs uncommitted
t.Log("Export isolation validated: uncommitted changes are tracked separately from commits")
}
// =============================================================================
// Test: Concurrent Dependency Operations
// Multiple goroutines add/remove dependencies simultaneously
// =============================================================================
func TestConcurrentDependencyOperations(t *testing.T) {
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
// Create parent issues
const numParents = 5
parentIDs := make([]string, numParents)
for i := 0; i < numParents; i++ {
issue := &types.Issue{
ID: fmt.Sprintf("test-dep-parent-%d", i),
Title: fmt.Sprintf("Parent Issue %d", i),
Description: "Parent for dependency test",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, "tester"); err != nil {
t.Fatalf("failed to create parent issue %d: %v", i, err)
}
parentIDs[i] = issue.ID
}
// Create child issue
child := &types.Issue{
ID: "test-dep-child",
Title: "Child Issue",
Description: "Child for dependency test",
Status: types.StatusOpen,
Priority: 2,
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, child, "tester"); err != nil {
t.Fatalf("failed to create child issue: %v", err)
}
var wg sync.WaitGroup
var addSuccess atomic.Int32
var addFail atomic.Int32
// Concurrently add dependencies from child to all parents
for i := 0; i < numParents; i++ {
wg.Add(1)
go func(parentIdx int) {
defer wg.Done()
dep := &types.Dependency{
IssueID: child.ID,
DependsOnID: parentIDs[parentIdx],
Type: types.DepBlocks,
}
if err := store.AddDependency(ctx, dep, fmt.Sprintf("worker-%d", parentIdx)); err != nil {
addFail.Add(1)
t.Logf("add dependency %d error: %v", parentIdx, err)
} else {
addSuccess.Add(1)
}
}(i)
}
wg.Wait()
t.Logf("Add dependency results: %d succeeded, %d failed", addSuccess.Load(), addFail.Load())
// Verify dependencies
deps, err := store.GetDependencies(ctx, child.ID)
if err != nil {
t.Fatalf("failed to get dependencies: %v", err)
}
t.Logf("Child has %d dependencies", len(deps))
// Check if child is blocked
blocked, blockers, err := store.IsBlocked(ctx, child.ID)
if err != nil {
t.Fatalf("failed to check if blocked: %v", err)
}
t.Logf("Child blocked: %v, blockers: %v", blocked, blockers)
}
// =============================================================================
// Test: High Contention Stress Test
// Many goroutines performing various operations simultaneously
// =============================================================================
func TestHighContentionStress(t *testing.T) {
if testing.Short() {
t.Skip("skipping stress test in short mode")
}
store, cleanup := setupTestStore(t)
defer cleanup()
ctx, cancel := concurrentTestContext(t)
defer cancel()
// Create initial issues
const numIssues = 10
for i := 0; i < numIssues; i++ {
issue := &types.Issue{
ID: fmt.Sprintf("stress-%d", i),
Title: fmt.Sprintf("Stress Test Issue %d", i),
Description: "For high contention stress testing",
Status: types.StatusOpen,
Priority: (i % 4) + 1, // Keep priority in valid range 1-4
IssueType: types.TypeTask,
}
if err := store.CreateIssue(ctx, issue, "tester"); err != nil {
t.Fatalf("failed to create issue %d: %v", i, err)
}
}
const numWorkers = 20
const opsPerWorker = 50
var wg sync.WaitGroup
var totalOps atomic.Int32
var failedOps atomic.Int32
// Launch workers doing mixed operations
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for op := 0; op < opsPerWorker; op++ {
issueID := fmt.Sprintf("stress-%d", op%numIssues)
switch op % 4 {
case 0: // Read
_, err := store.GetIssue(ctx, issueID)
if err != nil {
failedOps.Add(1)
}
case 1: // Update
err := store.UpdateIssue(ctx, issueID, map[string]interface{}{
"notes": fmt.Sprintf("Worker %d, op %d", workerID, op),
}, fmt.Sprintf("worker-%d", workerID))
if err != nil {
failedOps.Add(1)
}
case 2: // Add label
err := store.AddLabel(ctx, issueID, fmt.Sprintf("label-%d-%d", workerID, op), fmt.Sprintf("worker-%d", workerID))
if err != nil {
failedOps.Add(1)
}
case 3: // Search
_, err := store.SearchIssues(ctx, "Stress", types.IssueFilter{})
if err != nil {
failedOps.Add(1)
}
}
totalOps.Add(1)
}
}(w)
}
// Wait with timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Success
case <-ctx.Done():
t.Fatal("stress test timeout - possible deadlock")
}
t.Logf("Stress test completed: %d total ops, %d failed (%.2f%% success rate)",
totalOps.Load(), failedOps.Load(),
float64(totalOps.Load()-failedOps.Load())/float64(totalOps.Load())*100)
// Verify data integrity - all issues should still be readable
for i := 0; i < numIssues; i++ {
issueID := fmt.Sprintf("stress-%d", i)
issue, err := store.GetIssue(ctx, issueID)
if err != nil {
t.Errorf("failed to read issue %s after stress test: %v", issueID, err)
}
if issue == nil {
t.Errorf("issue %s missing after stress test", issueID)
}
}
}