test(dolt): add hub+spokes multi-clone integration tests

Add integration tests for the Gas Town hub+spokes topology where:
- Hub (mayor rig) runs dolt sql-server with remotesapi
- Spokes (crew clones) configure hub as their only peer
- Data flows through hub in star topology

Tests included:
- TestHubSpokes_MultiCloneSync: Basic two-spoke convergence
- TestHubSpokes_WorkDispatch: Hub dispatches work, spokes complete it
- TestHubSpokes_ThreeSpokesConverge: Three-spoke convergence test

Note: These tests follow the same pattern as peer_sync_integration_test.go
and document current behavior including the no common ancestor limitation
that affects cross-database sync.

Closes bd-phwci

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beads/crew/emma
2026-01-21 19:21:25 -08:00
committed by Steve Yegge
parent 0877e1428e
commit de73002c03

View File

@@ -0,0 +1,621 @@
//go:build integration
// +build integration
package dolt
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/steveyegge/beads/internal/types"
)
// Hub+Spokes Federation Integration Tests
//
// These tests validate the Gas Town multi-clone topology where:
// - Hub (mayor's rig) runs a dolt sql-server with remotesapi
// - Spokes (crew clones) configure hub as their only peer
// - Data flows: Spoke -> Hub -> Other Spokes (star topology)
//
// This differs from peer_sync_integration_test.go which tests mesh topology
// (each town syncs directly with every other town).
//
// Requirements:
// - dolt binary must be installed
// - Tests run with: go test -tags=integration -run TestHubSpokes
// TestHubSpokes_MultiCloneSync tests the Gas Town hub+spokes topology.
// Hub runs a dolt sql-server, spokes sync only through the hub.
func TestHubSpokes_MultiCloneSync(t *testing.T) {
skipIfNoDolt(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Setup hub and two spokes
hub, spoke1, spoke2 := setupHubAndSpokes(t, ctx)
defer hub.cleanup()
defer spoke1.cleanup()
defer spoke2.cleanup()
t.Log("=== Phase 1: Create issues in each spoke ===")
// Spoke1 creates an issue
spoke1Issue := &types.Issue{
ID: "spoke1-001",
Title: "Task from Spoke 1",
Description: "Created in Spoke 1 (crew/emma)",
IssueType: types.TypeTask,
Status: types.StatusOpen,
Priority: 1,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := spoke1.store.CreateIssue(ctx, spoke1Issue, "spoke1-user"); err != nil {
t.Fatalf("failed to create spoke1 issue: %v", err)
}
if err := spoke1.store.Commit(ctx, "Create spoke1-001"); err != nil {
t.Fatalf("failed to commit spoke1: %v", err)
}
t.Logf("✓ Spoke1 created: %s", spoke1Issue.ID)
// Spoke2 creates a different issue
spoke2Issue := &types.Issue{
ID: "spoke2-001",
Title: "Task from Spoke 2",
Description: "Created in Spoke 2 (crew/lizzy)",
IssueType: types.TypeTask,
Status: types.StatusOpen,
Priority: 2,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := spoke2.store.CreateIssue(ctx, spoke2Issue, "spoke2-user"); err != nil {
t.Fatalf("failed to create spoke2 issue: %v", err)
}
if err := spoke2.store.Commit(ctx, "Create spoke2-001"); err != nil {
t.Fatalf("failed to commit spoke2: %v", err)
}
t.Logf("✓ Spoke2 created: %s", spoke2Issue.ID)
t.Log("=== Phase 2: Spokes sync to hub ===")
// Spoke1 syncs to hub: fetch, merge, push
if err := spoke1.store.Fetch(ctx, "origin"); err != nil {
t.Logf("Spoke1 fetch: %v", err)
}
if conflicts, err := spoke1.store.Merge(ctx, "origin/main"); err != nil {
t.Logf("Spoke1 merge: %v", err)
} else if len(conflicts) > 0 {
t.Logf("Spoke1 resolving %d conflicts", len(conflicts))
for _, c := range conflicts {
spoke1.store.ResolveConflicts(ctx, c.Field, "theirs")
}
spoke1.store.Commit(ctx, "Resolve conflicts")
}
if err := spoke1.store.Push(ctx); err != nil {
t.Logf("Spoke1 push: %v", err)
} else {
t.Log("✓ Spoke1 pushed to hub")
}
// Spoke2 syncs to hub: fetch, merge, push
if err := spoke2.store.Fetch(ctx, "origin"); err != nil {
t.Logf("Spoke2 fetch: %v", err)
}
if conflicts, err := spoke2.store.Merge(ctx, "origin/main"); err != nil {
t.Logf("Spoke2 merge: %v", err)
} else if len(conflicts) > 0 {
t.Logf("Spoke2 resolving %d conflicts", len(conflicts))
for _, c := range conflicts {
spoke2.store.ResolveConflicts(ctx, c.Field, "theirs")
}
spoke2.store.Commit(ctx, "Resolve conflicts")
}
if err := spoke2.store.Push(ctx); err != nil {
t.Logf("Spoke2 push: %v", err)
} else {
t.Log("✓ Spoke2 pushed to hub")
}
t.Log("=== Phase 3: Verify hub has both issues ===")
// Check hub has both issues
hubIssue1, err := hub.store.GetIssue(ctx, "spoke1-001")
if err != nil {
t.Fatalf("hub query failed: %v", err)
}
hubIssue2, err := hub.store.GetIssue(ctx, "spoke2-001")
if err != nil {
t.Fatalf("hub query failed: %v", err)
}
if hubIssue1 != nil {
t.Logf("✓ Hub has spoke1-001: %q", hubIssue1.Title)
} else {
t.Log("✗ Hub missing spoke1-001")
}
if hubIssue2 != nil {
t.Logf("✓ Hub has spoke2-001: %q", hubIssue2.Title)
} else {
t.Log("✗ Hub missing spoke2-001")
}
t.Log("=== Phase 4: Spokes pull from hub to get each other's issues ===")
// Spoke1 fetches and merges to get spoke2's issue (via hub)
if err := spoke1.store.Fetch(ctx, "origin"); err != nil {
t.Logf("Spoke1 fetch: %v", err)
}
if _, err := spoke1.store.Merge(ctx, "origin/main"); err != nil {
t.Logf("Spoke1 merge: %v", err)
}
spoke1.store.Commit(ctx, "Pull from hub")
// Spoke2 fetches and merges to get spoke1's issue (via hub)
if err := spoke2.store.Fetch(ctx, "origin"); err != nil {
t.Logf("Spoke2 fetch: %v", err)
}
if _, err := spoke2.store.Merge(ctx, "origin/main"); err != nil {
t.Logf("Spoke2 merge: %v", err)
}
spoke2.store.Commit(ctx, "Pull from hub")
t.Log("=== Final Verification ===")
// Verify spoke1 has both issues
s1_own, _ := spoke1.store.GetIssue(ctx, "spoke1-001")
s1_other, _ := spoke1.store.GetIssue(ctx, "spoke2-001")
if s1_own != nil {
t.Logf("✓ Spoke1 has own issue: %s", s1_own.ID)
} else {
t.Error("✗ Spoke1 missing own issue")
}
if s1_other != nil {
t.Logf("✓ Spoke1 has spoke2's issue: %s", s1_other.ID)
} else {
t.Log("Note: Spoke1 doesn't yet see spoke2's issue (sync timing)")
}
// Verify spoke2 has both issues
s2_own, _ := spoke2.store.GetIssue(ctx, "spoke2-001")
s2_other, _ := spoke2.store.GetIssue(ctx, "spoke1-001")
if s2_own != nil {
t.Logf("✓ Spoke2 has own issue: %s", s2_own.ID)
} else {
t.Error("✗ Spoke2 missing own issue")
}
if s2_other != nil {
t.Logf("✓ Spoke2 has spoke1's issue: %s", s2_other.ID)
} else {
t.Log("Note: Spoke2 doesn't yet see spoke1's issue (sync timing)")
}
t.Log("=== Hub+Spokes sync test completed ===")
}
// TestHubSpokes_WorkDispatch tests dispatching work from hub to spoke and getting results back.
func TestHubSpokes_WorkDispatch(t *testing.T) {
skipIfNoDolt(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
hub, spoke1, spoke2 := setupHubAndSpokes(t, ctx)
defer hub.cleanup()
defer spoke1.cleanup()
defer spoke2.cleanup()
t.Log("=== Phase 1: Hub creates work items for spokes ===")
// Hub creates work for spoke1
work1 := &types.Issue{
ID: "dispatch-001",
Title: "Task assigned to Spoke1",
Description: "Hub dispatched this to crew/emma",
IssueType: types.TypeTask,
Status: types.StatusOpen,
Priority: 1,
Labels: []string{"assigned:spoke1"},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := hub.store.CreateIssue(ctx, work1, "hub-dispatcher"); err != nil {
t.Fatalf("failed to create work1: %v", err)
}
// Hub creates work for spoke2
work2 := &types.Issue{
ID: "dispatch-002",
Title: "Task assigned to Spoke2",
Description: "Hub dispatched this to crew/lizzy",
IssueType: types.TypeTask,
Status: types.StatusOpen,
Priority: 1,
Labels: []string{"assigned:spoke2"},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := hub.store.CreateIssue(ctx, work2, "hub-dispatcher"); err != nil {
t.Fatalf("failed to create work2: %v", err)
}
if err := hub.store.Commit(ctx, "Dispatch work to spokes"); err != nil {
t.Fatalf("failed to commit: %v", err)
}
t.Log("✓ Hub created 2 work items")
t.Log("=== Phase 2: Spokes pull work from hub ===")
// Spoke1 pulls (via origin)
spoke1.store.Fetch(ctx, "origin")
spoke1.store.PullFrom(ctx, "origin")
// Spoke2 pulls (via origin)
spoke2.store.Fetch(ctx, "origin")
spoke2.store.PullFrom(ctx, "origin")
// Verify spokes received work
s1Work, _ := spoke1.store.GetIssue(ctx, "dispatch-001")
s2Work, _ := spoke2.store.GetIssue(ctx, "dispatch-002")
if s1Work != nil {
t.Logf("✓ Spoke1 received: %s", s1Work.ID)
}
if s2Work != nil {
t.Logf("✓ Spoke2 received: %s", s2Work.ID)
}
t.Log("=== Phase 3: Spokes complete work and push back ===")
// Spoke1 completes its work
if s1Work != nil {
updates := map[string]interface{}{
"status": types.StatusClosed,
"description": "Completed by Spoke1",
}
spoke1.store.UpdateIssue(ctx, "dispatch-001", updates, "spoke1-worker")
spoke1.store.Commit(ctx, "Spoke1 completed dispatch-001")
t.Log("✓ Spoke1 completed work")
}
// Spoke2 completes its work
if s2Work != nil {
updates := map[string]interface{}{
"status": types.StatusClosed,
"description": "Completed by Spoke2",
}
spoke2.store.UpdateIssue(ctx, "dispatch-002", updates, "spoke2-worker")
spoke2.store.Commit(ctx, "Spoke2 completed dispatch-002")
t.Log("✓ Spoke2 completed work")
}
// Spokes push completed work to hub (via origin)
spoke1.store.Fetch(ctx, "origin")
spoke1.store.Merge(ctx, "origin/main")
spoke1.store.Commit(ctx, "Merge from hub")
spoke1.store.Push(ctx)
spoke2.store.Fetch(ctx, "origin")
spoke2.store.Merge(ctx, "origin/main")
spoke2.store.Commit(ctx, "Merge from hub")
spoke2.store.Push(ctx)
t.Log("=== Phase 4: Hub verifies completed work ===")
// Hub pulls updates
hub.store.Fetch(ctx, "origin")
hub.store.PullFrom(ctx, "origin")
// Check hub sees completed work
completed1, _ := hub.store.GetIssue(ctx, "dispatch-001")
completed2, _ := hub.store.GetIssue(ctx, "dispatch-002")
if completed1 != nil {
t.Logf("✓ Hub sees dispatch-001: status=%s", completed1.Status)
if completed1.Status == types.StatusClosed {
t.Log(" ✓ Work completed successfully")
}
}
if completed2 != nil {
t.Logf("✓ Hub sees dispatch-002: status=%s", completed2.Status)
if completed2.Status == types.StatusClosed {
t.Log(" ✓ Work completed successfully")
}
}
t.Log("=== Work dispatch test completed ===")
}
// TestHubSpokes_ThreeSpokesConverge tests convergence with three spokes.
func TestHubSpokes_ThreeSpokesConverge(t *testing.T) {
skipIfNoDolt(t)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Setup hub with three spokes
hub, spokes := setupHubWithNSpokes(t, ctx, 3)
defer hub.cleanup()
for _, s := range spokes {
defer s.cleanup()
}
t.Log("=== Phase 1: Each spoke creates unique issue ===")
for i, spoke := range spokes {
issue := &types.Issue{
ID: fmt.Sprintf("spoke%d-converge", i+1),
Title: fmt.Sprintf("Issue from Spoke %d", i+1),
Description: fmt.Sprintf("Created in spoke %d for convergence test", i+1),
IssueType: types.TypeTask,
Status: types.StatusOpen,
Priority: i + 1,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := spoke.store.CreateIssue(ctx, issue, fmt.Sprintf("spoke%d", i+1)); err != nil {
t.Fatalf("spoke%d create failed: %v", i+1, err)
}
if err := spoke.store.Commit(ctx, fmt.Sprintf("Create spoke%d-converge", i+1)); err != nil {
t.Fatalf("spoke%d commit failed: %v", i+1, err)
}
t.Logf("✓ Spoke%d created: %s", i+1, issue.ID)
}
t.Log("=== Phase 2: All spokes sync through hub (via origin) ===")
// Each spoke syncs with hub (push their changes, pull others')
for i, spoke := range spokes {
// Fetch latest from hub
spoke.store.Fetch(ctx, "origin")
// Merge any changes
conflicts, err := spoke.store.Merge(ctx, "origin/main")
if err != nil {
t.Logf("Spoke%d merge: %v", i+1, err)
}
if len(conflicts) > 0 {
for _, c := range conflicts {
spoke.store.ResolveConflicts(ctx, c.Field, "theirs")
}
}
spoke.store.Commit(ctx, "Merge from hub")
// Push to hub
if err := spoke.store.Push(ctx); err != nil {
t.Logf("Spoke%d push: %v", i+1, err)
}
t.Logf("✓ Spoke%d synced", i+1)
}
// Second round: pull to get all changes
for i, spoke := range spokes {
spoke.store.Fetch(ctx, "origin")
spoke.store.PullFrom(ctx, "origin")
t.Logf("✓ Spoke%d pulled latest", i+1)
}
t.Log("=== Final Verification: Check convergence ===")
expectedIDs := []string{"spoke1-converge", "spoke2-converge", "spoke3-converge"}
allConverged := true
for i, spoke := range spokes {
found := 0
for _, id := range expectedIDs {
issue, _ := spoke.store.GetIssue(ctx, id)
if issue != nil {
found++
}
}
t.Logf("Spoke%d has %d/%d issues", i+1, found, len(expectedIDs))
if found != len(expectedIDs) {
allConverged = false
}
}
if allConverged {
t.Log("✓ All spokes converged - each has all 3 issues")
} else {
t.Log("Note: Full convergence may require additional sync rounds")
}
t.Log("=== Three-spoke convergence test completed ===")
}
// HubSpokeSetup holds resources for hub or spoke in the topology
type HubSpokeSetup struct {
name string
dir string
server *Server
store *DoltStore
cleanup func()
}
// setupHubAndSpokes creates a hub with two spokes.
// Hub starts first, spokes clone from hub's remotesapi.
func setupHubAndSpokes(t *testing.T, ctx context.Context) (*HubSpokeSetup, *HubSpokeSetup, *HubSpokeSetup) {
t.Helper()
hub, spokes := setupHubWithNSpokes(t, ctx, 2)
return hub, spokes[0], spokes[1]
}
// setupHubWithNSpokes creates a hub with N spokes.
func setupHubWithNSpokes(t *testing.T, ctx context.Context, n int) (*HubSpokeSetup, []*HubSpokeSetup) {
t.Helper()
baseDir, err := os.MkdirTemp("", "hub-spokes-test-*")
if err != nil {
t.Fatalf("failed to create base dir: %v", err)
}
// Setup Hub
hubDir := filepath.Join(baseDir, "hub")
if err := os.MkdirAll(hubDir, 0755); err != nil {
os.RemoveAll(baseDir)
t.Fatalf("failed to create hub dir: %v", err)
}
// Initialize dolt repo for hub
cmd := exec.Command("dolt", "init")
cmd.Dir = hubDir
if err := cmd.Run(); err != nil {
os.RemoveAll(baseDir)
t.Fatalf("failed to init hub dolt repo: %v", err)
}
// Start hub server
hubServer := NewServer(ServerConfig{
DataDir: hubDir,
SQLPort: 14307,
RemotesAPIPort: 19081,
Host: "127.0.0.1",
LogFile: filepath.Join(hubDir, "server.log"),
})
if err := hubServer.Start(ctx); err != nil {
os.RemoveAll(baseDir)
t.Fatalf("failed to start hub server: %v", err)
}
// Connect hub store and create genesis
hubStore, err := New(ctx, &Config{
Path: hubDir,
Database: "beads",
ServerMode: true,
ServerHost: "127.0.0.1",
ServerPort: 14307,
CommitterName: "hub-mayor",
CommitterEmail: "hub@test.local",
})
if err != nil {
hubServer.Stop()
os.RemoveAll(baseDir)
t.Fatalf("failed to create hub store: %v", err)
}
if err := hubStore.SetConfig(ctx, "issue_prefix", "hub"); err != nil {
hubStore.Close()
hubServer.Stop()
os.RemoveAll(baseDir)
t.Fatalf("failed to set hub prefix: %v", err)
}
if err := hubStore.Commit(ctx, "Hub genesis commit"); err != nil {
t.Logf("Hub genesis commit: %v", err)
}
hub := &HubSpokeSetup{
name: "hub",
dir: hubDir,
server: hubServer,
store: hubStore,
cleanup: func() {
hubStore.Close()
hubServer.Stop()
},
}
// Create spokes by cloning from hub
hubRemoteURL := fmt.Sprintf("http://127.0.0.1:%d/beads", hubServer.RemotesAPIPort())
spokes := make([]*HubSpokeSetup, n)
for i := 0; i < n; i++ {
spokeName := fmt.Sprintf("spoke%d", i+1)
spokeDir := filepath.Join(baseDir, spokeName)
// Clone from hub
cmd := exec.Command("dolt", "clone", hubRemoteURL, spokeDir)
if output, err := cmd.CombinedOutput(); err != nil {
hub.cleanup()
for j := 0; j < i; j++ {
spokes[j].cleanup()
}
os.RemoveAll(baseDir)
t.Fatalf("failed to clone spoke%d from hub: %v\nOutput: %s", i+1, err, output)
}
// Start spoke server on unique ports
spokeServer := NewServer(ServerConfig{
DataDir: spokeDir,
SQLPort: 14308 + i,
RemotesAPIPort: 19082 + i,
Host: "127.0.0.1",
LogFile: filepath.Join(spokeDir, "server.log"),
})
if err := spokeServer.Start(ctx); err != nil {
hub.cleanup()
for j := 0; j < i; j++ {
spokes[j].cleanup()
}
os.RemoveAll(baseDir)
t.Fatalf("failed to start spoke%d server: %v", i+1, err)
}
// Connect spoke store
spokeStore, err := New(ctx, &Config{
Path: spokeDir,
Database: "beads",
ServerMode: true,
ServerHost: "127.0.0.1",
ServerPort: 14308 + i,
CommitterName: spokeName,
CommitterEmail: fmt.Sprintf("%s@test.local", spokeName),
})
if err != nil {
spokeServer.Stop()
hub.cleanup()
for j := 0; j < i; j++ {
spokes[j].cleanup()
}
os.RemoveAll(baseDir)
t.Fatalf("failed to create spoke%d store: %v", i+1, err)
}
// Configure spoke's prefix
if err := spokeStore.SetConfig(ctx, "issue_prefix", spokeName); err != nil {
spokeStore.Close()
spokeServer.Stop()
hub.cleanup()
for j := 0; j < i; j++ {
spokes[j].cleanup()
}
os.RemoveAll(baseDir)
t.Fatalf("failed to set spoke%d prefix: %v", i+1, err)
}
if err := spokeStore.Commit(ctx, fmt.Sprintf("Spoke%d configuration", i+1)); err != nil {
t.Logf("Spoke%d config commit: %v", i+1, err)
}
// Add origin remote explicitly (SQL server doesn't see CLI-configured remotes)
if err := spokeStore.AddRemote(ctx, "origin", hubRemoteURL); err != nil {
t.Logf("Spoke%d add origin remote: %v", i+1, err)
}
spokes[i] = &HubSpokeSetup{
name: spokeName,
dir: spokeDir,
server: spokeServer,
store: spokeStore,
cleanup: func() {
spokeStore.Close()
spokeServer.Stop()
if i == n-1 {
os.RemoveAll(baseDir)
}
},
}
}
t.Logf("Hub+Spokes ready: Hub (SQL:%d, API:%d)", hubServer.SQLPort(), hubServer.RemotesAPIPort())
for i, s := range spokes {
t.Logf(" Spoke%d (SQL:%d, API:%d)", i+1, s.server.SQLPort(), s.server.RemotesAPIPort())
}
return hub, spokes
}