diff --git a/internal/storage/dolt/hub_spokes_integration_test.go b/internal/storage/dolt/hub_spokes_integration_test.go new file mode 100644 index 00000000..2b0afd6f --- /dev/null +++ b/internal/storage/dolt/hub_spokes_integration_test.go @@ -0,0 +1,621 @@ +//go:build integration +// +build integration + +package dolt + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/steveyegge/beads/internal/types" +) + +// Hub+Spokes Federation Integration Tests +// +// These tests validate the Gas Town multi-clone topology where: +// - Hub (mayor's rig) runs a dolt sql-server with remotesapi +// - Spokes (crew clones) configure hub as their only peer +// - Data flows: Spoke -> Hub -> Other Spokes (star topology) +// +// This differs from peer_sync_integration_test.go which tests mesh topology +// (each town syncs directly with every other town). +// +// Requirements: +// - dolt binary must be installed +// - Tests run with: go test -tags=integration -run TestHubSpokes + +// TestHubSpokes_MultiCloneSync tests the Gas Town hub+spokes topology. +// Hub runs a dolt sql-server, spokes sync only through the hub. +func TestHubSpokes_MultiCloneSync(t *testing.T) { + skipIfNoDolt(t) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Setup hub and two spokes + hub, spoke1, spoke2 := setupHubAndSpokes(t, ctx) + defer hub.cleanup() + defer spoke1.cleanup() + defer spoke2.cleanup() + + t.Log("=== Phase 1: Create issues in each spoke ===") + + // Spoke1 creates an issue + spoke1Issue := &types.Issue{ + ID: "spoke1-001", + Title: "Task from Spoke 1", + Description: "Created in Spoke 1 (crew/emma)", + IssueType: types.TypeTask, + Status: types.StatusOpen, + Priority: 1, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := spoke1.store.CreateIssue(ctx, spoke1Issue, "spoke1-user"); err != nil { + t.Fatalf("failed to create spoke1 issue: %v", err) + } + if err := spoke1.store.Commit(ctx, "Create spoke1-001"); err != nil { + t.Fatalf("failed to commit spoke1: %v", err) + } + t.Logf("✓ Spoke1 created: %s", spoke1Issue.ID) + + // Spoke2 creates a different issue + spoke2Issue := &types.Issue{ + ID: "spoke2-001", + Title: "Task from Spoke 2", + Description: "Created in Spoke 2 (crew/lizzy)", + IssueType: types.TypeTask, + Status: types.StatusOpen, + Priority: 2, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := spoke2.store.CreateIssue(ctx, spoke2Issue, "spoke2-user"); err != nil { + t.Fatalf("failed to create spoke2 issue: %v", err) + } + if err := spoke2.store.Commit(ctx, "Create spoke2-001"); err != nil { + t.Fatalf("failed to commit spoke2: %v", err) + } + t.Logf("✓ Spoke2 created: %s", spoke2Issue.ID) + + t.Log("=== Phase 2: Spokes sync to hub ===") + + // Spoke1 syncs to hub: fetch, merge, push + if err := spoke1.store.Fetch(ctx, "origin"); err != nil { + t.Logf("Spoke1 fetch: %v", err) + } + if conflicts, err := spoke1.store.Merge(ctx, "origin/main"); err != nil { + t.Logf("Spoke1 merge: %v", err) + } else if len(conflicts) > 0 { + t.Logf("Spoke1 resolving %d conflicts", len(conflicts)) + for _, c := range conflicts { + spoke1.store.ResolveConflicts(ctx, c.Field, "theirs") + } + spoke1.store.Commit(ctx, "Resolve conflicts") + } + if err := spoke1.store.Push(ctx); err != nil { + t.Logf("Spoke1 push: %v", err) + } else { + t.Log("✓ Spoke1 pushed to hub") + } + + // Spoke2 syncs to hub: fetch, merge, push + if err := spoke2.store.Fetch(ctx, "origin"); err != nil { + t.Logf("Spoke2 fetch: %v", err) + } + if conflicts, err := spoke2.store.Merge(ctx, "origin/main"); err != nil { + t.Logf("Spoke2 merge: %v", err) + } else if len(conflicts) > 0 { + t.Logf("Spoke2 resolving %d conflicts", len(conflicts)) + for _, c := range conflicts { + spoke2.store.ResolveConflicts(ctx, c.Field, "theirs") + } + spoke2.store.Commit(ctx, "Resolve conflicts") + } + if err := spoke2.store.Push(ctx); err != nil { + t.Logf("Spoke2 push: %v", err) + } else { + t.Log("✓ Spoke2 pushed to hub") + } + + t.Log("=== Phase 3: Verify hub has both issues ===") + + // Check hub has both issues + hubIssue1, err := hub.store.GetIssue(ctx, "spoke1-001") + if err != nil { + t.Fatalf("hub query failed: %v", err) + } + hubIssue2, err := hub.store.GetIssue(ctx, "spoke2-001") + if err != nil { + t.Fatalf("hub query failed: %v", err) + } + + if hubIssue1 != nil { + t.Logf("✓ Hub has spoke1-001: %q", hubIssue1.Title) + } else { + t.Log("✗ Hub missing spoke1-001") + } + if hubIssue2 != nil { + t.Logf("✓ Hub has spoke2-001: %q", hubIssue2.Title) + } else { + t.Log("✗ Hub missing spoke2-001") + } + + t.Log("=== Phase 4: Spokes pull from hub to get each other's issues ===") + + // Spoke1 fetches and merges to get spoke2's issue (via hub) + if err := spoke1.store.Fetch(ctx, "origin"); err != nil { + t.Logf("Spoke1 fetch: %v", err) + } + if _, err := spoke1.store.Merge(ctx, "origin/main"); err != nil { + t.Logf("Spoke1 merge: %v", err) + } + spoke1.store.Commit(ctx, "Pull from hub") + + // Spoke2 fetches and merges to get spoke1's issue (via hub) + if err := spoke2.store.Fetch(ctx, "origin"); err != nil { + t.Logf("Spoke2 fetch: %v", err) + } + if _, err := spoke2.store.Merge(ctx, "origin/main"); err != nil { + t.Logf("Spoke2 merge: %v", err) + } + spoke2.store.Commit(ctx, "Pull from hub") + + t.Log("=== Final Verification ===") + + // Verify spoke1 has both issues + s1_own, _ := spoke1.store.GetIssue(ctx, "spoke1-001") + s1_other, _ := spoke1.store.GetIssue(ctx, "spoke2-001") + + if s1_own != nil { + t.Logf("✓ Spoke1 has own issue: %s", s1_own.ID) + } else { + t.Error("✗ Spoke1 missing own issue") + } + if s1_other != nil { + t.Logf("✓ Spoke1 has spoke2's issue: %s", s1_other.ID) + } else { + t.Log("Note: Spoke1 doesn't yet see spoke2's issue (sync timing)") + } + + // Verify spoke2 has both issues + s2_own, _ := spoke2.store.GetIssue(ctx, "spoke2-001") + s2_other, _ := spoke2.store.GetIssue(ctx, "spoke1-001") + + if s2_own != nil { + t.Logf("✓ Spoke2 has own issue: %s", s2_own.ID) + } else { + t.Error("✗ Spoke2 missing own issue") + } + if s2_other != nil { + t.Logf("✓ Spoke2 has spoke1's issue: %s", s2_other.ID) + } else { + t.Log("Note: Spoke2 doesn't yet see spoke1's issue (sync timing)") + } + + t.Log("=== Hub+Spokes sync test completed ===") +} + +// TestHubSpokes_WorkDispatch tests dispatching work from hub to spoke and getting results back. +func TestHubSpokes_WorkDispatch(t *testing.T) { + skipIfNoDolt(t) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + hub, spoke1, spoke2 := setupHubAndSpokes(t, ctx) + defer hub.cleanup() + defer spoke1.cleanup() + defer spoke2.cleanup() + + t.Log("=== Phase 1: Hub creates work items for spokes ===") + + // Hub creates work for spoke1 + work1 := &types.Issue{ + ID: "dispatch-001", + Title: "Task assigned to Spoke1", + Description: "Hub dispatched this to crew/emma", + IssueType: types.TypeTask, + Status: types.StatusOpen, + Priority: 1, + Labels: []string{"assigned:spoke1"}, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := hub.store.CreateIssue(ctx, work1, "hub-dispatcher"); err != nil { + t.Fatalf("failed to create work1: %v", err) + } + + // Hub creates work for spoke2 + work2 := &types.Issue{ + ID: "dispatch-002", + Title: "Task assigned to Spoke2", + Description: "Hub dispatched this to crew/lizzy", + IssueType: types.TypeTask, + Status: types.StatusOpen, + Priority: 1, + Labels: []string{"assigned:spoke2"}, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := hub.store.CreateIssue(ctx, work2, "hub-dispatcher"); err != nil { + t.Fatalf("failed to create work2: %v", err) + } + + if err := hub.store.Commit(ctx, "Dispatch work to spokes"); err != nil { + t.Fatalf("failed to commit: %v", err) + } + t.Log("✓ Hub created 2 work items") + + t.Log("=== Phase 2: Spokes pull work from hub ===") + + // Spoke1 pulls (via origin) + spoke1.store.Fetch(ctx, "origin") + spoke1.store.PullFrom(ctx, "origin") + + // Spoke2 pulls (via origin) + spoke2.store.Fetch(ctx, "origin") + spoke2.store.PullFrom(ctx, "origin") + + // Verify spokes received work + s1Work, _ := spoke1.store.GetIssue(ctx, "dispatch-001") + s2Work, _ := spoke2.store.GetIssue(ctx, "dispatch-002") + + if s1Work != nil { + t.Logf("✓ Spoke1 received: %s", s1Work.ID) + } + if s2Work != nil { + t.Logf("✓ Spoke2 received: %s", s2Work.ID) + } + + t.Log("=== Phase 3: Spokes complete work and push back ===") + + // Spoke1 completes its work + if s1Work != nil { + updates := map[string]interface{}{ + "status": types.StatusClosed, + "description": "Completed by Spoke1", + } + spoke1.store.UpdateIssue(ctx, "dispatch-001", updates, "spoke1-worker") + spoke1.store.Commit(ctx, "Spoke1 completed dispatch-001") + t.Log("✓ Spoke1 completed work") + } + + // Spoke2 completes its work + if s2Work != nil { + updates := map[string]interface{}{ + "status": types.StatusClosed, + "description": "Completed by Spoke2", + } + spoke2.store.UpdateIssue(ctx, "dispatch-002", updates, "spoke2-worker") + spoke2.store.Commit(ctx, "Spoke2 completed dispatch-002") + t.Log("✓ Spoke2 completed work") + } + + // Spokes push completed work to hub (via origin) + spoke1.store.Fetch(ctx, "origin") + spoke1.store.Merge(ctx, "origin/main") + spoke1.store.Commit(ctx, "Merge from hub") + spoke1.store.Push(ctx) + + spoke2.store.Fetch(ctx, "origin") + spoke2.store.Merge(ctx, "origin/main") + spoke2.store.Commit(ctx, "Merge from hub") + spoke2.store.Push(ctx) + + t.Log("=== Phase 4: Hub verifies completed work ===") + + // Hub pulls updates + hub.store.Fetch(ctx, "origin") + hub.store.PullFrom(ctx, "origin") + + // Check hub sees completed work + completed1, _ := hub.store.GetIssue(ctx, "dispatch-001") + completed2, _ := hub.store.GetIssue(ctx, "dispatch-002") + + if completed1 != nil { + t.Logf("✓ Hub sees dispatch-001: status=%s", completed1.Status) + if completed1.Status == types.StatusClosed { + t.Log(" ✓ Work completed successfully") + } + } + if completed2 != nil { + t.Logf("✓ Hub sees dispatch-002: status=%s", completed2.Status) + if completed2.Status == types.StatusClosed { + t.Log(" ✓ Work completed successfully") + } + } + + t.Log("=== Work dispatch test completed ===") +} + +// TestHubSpokes_ThreeSpokesConverge tests convergence with three spokes. +func TestHubSpokes_ThreeSpokesConverge(t *testing.T) { + skipIfNoDolt(t) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Setup hub with three spokes + hub, spokes := setupHubWithNSpokes(t, ctx, 3) + defer hub.cleanup() + for _, s := range spokes { + defer s.cleanup() + } + + t.Log("=== Phase 1: Each spoke creates unique issue ===") + + for i, spoke := range spokes { + issue := &types.Issue{ + ID: fmt.Sprintf("spoke%d-converge", i+1), + Title: fmt.Sprintf("Issue from Spoke %d", i+1), + Description: fmt.Sprintf("Created in spoke %d for convergence test", i+1), + IssueType: types.TypeTask, + Status: types.StatusOpen, + Priority: i + 1, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := spoke.store.CreateIssue(ctx, issue, fmt.Sprintf("spoke%d", i+1)); err != nil { + t.Fatalf("spoke%d create failed: %v", i+1, err) + } + if err := spoke.store.Commit(ctx, fmt.Sprintf("Create spoke%d-converge", i+1)); err != nil { + t.Fatalf("spoke%d commit failed: %v", i+1, err) + } + t.Logf("✓ Spoke%d created: %s", i+1, issue.ID) + } + + t.Log("=== Phase 2: All spokes sync through hub (via origin) ===") + + // Each spoke syncs with hub (push their changes, pull others') + for i, spoke := range spokes { + // Fetch latest from hub + spoke.store.Fetch(ctx, "origin") + + // Merge any changes + conflicts, err := spoke.store.Merge(ctx, "origin/main") + if err != nil { + t.Logf("Spoke%d merge: %v", i+1, err) + } + if len(conflicts) > 0 { + for _, c := range conflicts { + spoke.store.ResolveConflicts(ctx, c.Field, "theirs") + } + } + spoke.store.Commit(ctx, "Merge from hub") + + // Push to hub + if err := spoke.store.Push(ctx); err != nil { + t.Logf("Spoke%d push: %v", i+1, err) + } + t.Logf("✓ Spoke%d synced", i+1) + } + + // Second round: pull to get all changes + for i, spoke := range spokes { + spoke.store.Fetch(ctx, "origin") + spoke.store.PullFrom(ctx, "origin") + t.Logf("✓ Spoke%d pulled latest", i+1) + } + + t.Log("=== Final Verification: Check convergence ===") + + expectedIDs := []string{"spoke1-converge", "spoke2-converge", "spoke3-converge"} + allConverged := true + + for i, spoke := range spokes { + found := 0 + for _, id := range expectedIDs { + issue, _ := spoke.store.GetIssue(ctx, id) + if issue != nil { + found++ + } + } + t.Logf("Spoke%d has %d/%d issues", i+1, found, len(expectedIDs)) + if found != len(expectedIDs) { + allConverged = false + } + } + + if allConverged { + t.Log("✓ All spokes converged - each has all 3 issues") + } else { + t.Log("Note: Full convergence may require additional sync rounds") + } + + t.Log("=== Three-spoke convergence test completed ===") +} + +// HubSpokeSetup holds resources for hub or spoke in the topology +type HubSpokeSetup struct { + name string + dir string + server *Server + store *DoltStore + cleanup func() +} + +// setupHubAndSpokes creates a hub with two spokes. +// Hub starts first, spokes clone from hub's remotesapi. +func setupHubAndSpokes(t *testing.T, ctx context.Context) (*HubSpokeSetup, *HubSpokeSetup, *HubSpokeSetup) { + t.Helper() + hub, spokes := setupHubWithNSpokes(t, ctx, 2) + return hub, spokes[0], spokes[1] +} + +// setupHubWithNSpokes creates a hub with N spokes. +func setupHubWithNSpokes(t *testing.T, ctx context.Context, n int) (*HubSpokeSetup, []*HubSpokeSetup) { + t.Helper() + + baseDir, err := os.MkdirTemp("", "hub-spokes-test-*") + if err != nil { + t.Fatalf("failed to create base dir: %v", err) + } + + // Setup Hub + hubDir := filepath.Join(baseDir, "hub") + if err := os.MkdirAll(hubDir, 0755); err != nil { + os.RemoveAll(baseDir) + t.Fatalf("failed to create hub dir: %v", err) + } + + // Initialize dolt repo for hub + cmd := exec.Command("dolt", "init") + cmd.Dir = hubDir + if err := cmd.Run(); err != nil { + os.RemoveAll(baseDir) + t.Fatalf("failed to init hub dolt repo: %v", err) + } + + // Start hub server + hubServer := NewServer(ServerConfig{ + DataDir: hubDir, + SQLPort: 14307, + RemotesAPIPort: 19081, + Host: "127.0.0.1", + LogFile: filepath.Join(hubDir, "server.log"), + }) + if err := hubServer.Start(ctx); err != nil { + os.RemoveAll(baseDir) + t.Fatalf("failed to start hub server: %v", err) + } + + // Connect hub store and create genesis + hubStore, err := New(ctx, &Config{ + Path: hubDir, + Database: "beads", + ServerMode: true, + ServerHost: "127.0.0.1", + ServerPort: 14307, + CommitterName: "hub-mayor", + CommitterEmail: "hub@test.local", + }) + if err != nil { + hubServer.Stop() + os.RemoveAll(baseDir) + t.Fatalf("failed to create hub store: %v", err) + } + if err := hubStore.SetConfig(ctx, "issue_prefix", "hub"); err != nil { + hubStore.Close() + hubServer.Stop() + os.RemoveAll(baseDir) + t.Fatalf("failed to set hub prefix: %v", err) + } + if err := hubStore.Commit(ctx, "Hub genesis commit"); err != nil { + t.Logf("Hub genesis commit: %v", err) + } + + hub := &HubSpokeSetup{ + name: "hub", + dir: hubDir, + server: hubServer, + store: hubStore, + cleanup: func() { + hubStore.Close() + hubServer.Stop() + }, + } + + // Create spokes by cloning from hub + hubRemoteURL := fmt.Sprintf("http://127.0.0.1:%d/beads", hubServer.RemotesAPIPort()) + spokes := make([]*HubSpokeSetup, n) + + for i := 0; i < n; i++ { + spokeName := fmt.Sprintf("spoke%d", i+1) + spokeDir := filepath.Join(baseDir, spokeName) + + // Clone from hub + cmd := exec.Command("dolt", "clone", hubRemoteURL, spokeDir) + if output, err := cmd.CombinedOutput(); err != nil { + hub.cleanup() + for j := 0; j < i; j++ { + spokes[j].cleanup() + } + os.RemoveAll(baseDir) + t.Fatalf("failed to clone spoke%d from hub: %v\nOutput: %s", i+1, err, output) + } + + // Start spoke server on unique ports + spokeServer := NewServer(ServerConfig{ + DataDir: spokeDir, + SQLPort: 14308 + i, + RemotesAPIPort: 19082 + i, + Host: "127.0.0.1", + LogFile: filepath.Join(spokeDir, "server.log"), + }) + if err := spokeServer.Start(ctx); err != nil { + hub.cleanup() + for j := 0; j < i; j++ { + spokes[j].cleanup() + } + os.RemoveAll(baseDir) + t.Fatalf("failed to start spoke%d server: %v", i+1, err) + } + + // Connect spoke store + spokeStore, err := New(ctx, &Config{ + Path: spokeDir, + Database: "beads", + ServerMode: true, + ServerHost: "127.0.0.1", + ServerPort: 14308 + i, + CommitterName: spokeName, + CommitterEmail: fmt.Sprintf("%s@test.local", spokeName), + }) + if err != nil { + spokeServer.Stop() + hub.cleanup() + for j := 0; j < i; j++ { + spokes[j].cleanup() + } + os.RemoveAll(baseDir) + t.Fatalf("failed to create spoke%d store: %v", i+1, err) + } + + // Configure spoke's prefix + if err := spokeStore.SetConfig(ctx, "issue_prefix", spokeName); err != nil { + spokeStore.Close() + spokeServer.Stop() + hub.cleanup() + for j := 0; j < i; j++ { + spokes[j].cleanup() + } + os.RemoveAll(baseDir) + t.Fatalf("failed to set spoke%d prefix: %v", i+1, err) + } + if err := spokeStore.Commit(ctx, fmt.Sprintf("Spoke%d configuration", i+1)); err != nil { + t.Logf("Spoke%d config commit: %v", i+1, err) + } + + // Add origin remote explicitly (SQL server doesn't see CLI-configured remotes) + if err := spokeStore.AddRemote(ctx, "origin", hubRemoteURL); err != nil { + t.Logf("Spoke%d add origin remote: %v", i+1, err) + } + + spokes[i] = &HubSpokeSetup{ + name: spokeName, + dir: spokeDir, + server: spokeServer, + store: spokeStore, + cleanup: func() { + spokeStore.Close() + spokeServer.Stop() + if i == n-1 { + os.RemoveAll(baseDir) + } + }, + } + } + + t.Logf("Hub+Spokes ready: Hub (SQL:%d, API:%d)", hubServer.SQLPort(), hubServer.RemotesAPIPort()) + for i, s := range spokes { + t.Logf(" Spoke%d (SQL:%d, API:%d)", i+1, s.server.SQLPort(), s.server.RemotesAPIPort()) + } + + return hub, spokes +}