diff --git a/internal/cmd/up.go b/internal/cmd/up.go index e4820fbf..e28245ea 100644 --- a/internal/cmd/up.go +++ b/internal/cmd/up.go @@ -6,6 +6,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "time" "github.com/spf13/cobra" @@ -18,12 +19,23 @@ import ( "github.com/steveyegge/gastown/internal/mayor" "github.com/steveyegge/gastown/internal/polecat" "github.com/steveyegge/gastown/internal/refinery" + "github.com/steveyegge/gastown/internal/rig" "github.com/steveyegge/gastown/internal/style" "github.com/steveyegge/gastown/internal/tmux" "github.com/steveyegge/gastown/internal/witness" "github.com/steveyegge/gastown/internal/workspace" ) +// agentStartResult holds the result of starting an agent. +type agentStartResult struct { + name string // Display name like "Witness (gastown)" + ok bool // Whether start succeeded + detail string // Status detail (session name or error) +} + +// maxConcurrentAgentStarts limits parallel agent startups to avoid resource exhaustion. +const maxConcurrentAgentStarts = 10 + var upCmd = &cobra.Command{ Use: "up", GroupID: GroupServices, @@ -70,89 +82,108 @@ func runUp(cmd *cobra.Command, args []string) error { allOK := true - // 1. Daemon (Go process) - if err := ensureDaemon(townRoot); err != nil { - printStatus("Daemon", false, err.Error()) - allOK = false - } else { - running, pid, _ := daemon.IsRunning(townRoot) - if running { - printStatus("Daemon", true, fmt.Sprintf("PID %d", pid)) - } - } - - // 2. Deacon (Claude agent) - deaconMgr := deacon.NewManager(townRoot) - if err := deaconMgr.Start(""); err != nil { - if err == deacon.ErrAlreadyRunning { - printStatus("Deacon", true, deaconMgr.SessionName()) - } else { - printStatus("Deacon", false, err.Error()) - allOK = false - } - } else { - printStatus("Deacon", true, deaconMgr.SessionName()) - } - - // 3. Mayor (Claude agent) - mayorMgr := mayor.NewManager(townRoot) - if err := mayorMgr.Start(""); err != nil { - if err == mayor.ErrAlreadyRunning { - printStatus("Mayor", true, mayorMgr.SessionName()) - } else { - printStatus("Mayor", false, err.Error()) - allOK = false - } - } else { - printStatus("Mayor", true, mayorMgr.SessionName()) - } - - // 4. Witnesses (one per rig) + // Discover rigs early so we can prefetch while daemon/deacon/mayor start rigs := discoverRigs(townRoot) - for _, rigName := range rigs { - _, r, err := getRig(rigName) - if err != nil { - printStatus(fmt.Sprintf("Witness (%s)", rigName), false, err.Error()) - allOK = false - continue - } - mgr := witness.NewManager(r) - if err := mgr.Start(false, "", nil); err != nil { - if err == witness.ErrAlreadyRunning { - printStatus(fmt.Sprintf("Witness (%s)", rigName), true, mgr.SessionName()) + // Start daemon, deacon, mayor, and rig prefetch in parallel + var daemonErr error + var daemonPID int + var deaconResult, mayorResult agentStartResult + var prefetchedRigs map[string]*rig.Rig + var rigErrors map[string]error + + var startupWg sync.WaitGroup + startupWg.Add(4) + + // 1. Daemon (Go process) + go func() { + defer startupWg.Done() + if err := ensureDaemon(townRoot); err != nil { + daemonErr = err + } else { + running, pid, _ := daemon.IsRunning(townRoot) + if running { + daemonPID = pid + } + } + }() + + // 2. Deacon + go func() { + defer startupWg.Done() + deaconMgr := deacon.NewManager(townRoot) + if err := deaconMgr.Start(""); err != nil { + if err == deacon.ErrAlreadyRunning { + deaconResult = agentStartResult{name: "Deacon", ok: true, detail: deaconMgr.SessionName()} } else { - printStatus(fmt.Sprintf("Witness (%s)", rigName), false, err.Error()) - allOK = false + deaconResult = agentStartResult{name: "Deacon", ok: false, detail: err.Error()} } } else { - printStatus(fmt.Sprintf("Witness (%s)", rigName), true, mgr.SessionName()) + deaconResult = agentStartResult{name: "Deacon", ok: true, detail: deaconMgr.SessionName()} + } + }() + + // 3. Mayor + go func() { + defer startupWg.Done() + mayorMgr := mayor.NewManager(townRoot) + if err := mayorMgr.Start(""); err != nil { + if err == mayor.ErrAlreadyRunning { + mayorResult = agentStartResult{name: "Mayor", ok: true, detail: mayorMgr.SessionName()} + } else { + mayorResult = agentStartResult{name: "Mayor", ok: false, detail: err.Error()} + } + } else { + mayorResult = agentStartResult{name: "Mayor", ok: true, detail: mayorMgr.SessionName()} + } + }() + + // 4. Prefetch rig configs (overlaps with daemon/deacon/mayor startup) + go func() { + defer startupWg.Done() + prefetchedRigs, rigErrors = prefetchRigs(rigs) + }() + + startupWg.Wait() + + // Print daemon/deacon/mayor results + if daemonErr != nil { + printStatus("Daemon", false, daemonErr.Error()) + allOK = false + } else if daemonPID > 0 { + printStatus("Daemon", true, fmt.Sprintf("PID %d", daemonPID)) + } + printStatus(deaconResult.name, deaconResult.ok, deaconResult.detail) + if !deaconResult.ok { + allOK = false + } + printStatus(mayorResult.name, mayorResult.ok, mayorResult.detail) + if !mayorResult.ok { + allOK = false + } + + // 5 & 6. Witnesses and Refineries (using prefetched rigs) + witnessResults, refineryResults := startRigAgentsWithPrefetch(rigs, prefetchedRigs, rigErrors) + + // Print results in order: all witnesses first, then all refineries + for _, rigName := range rigs { + if result, ok := witnessResults[rigName]; ok { + printStatus(result.name, result.ok, result.detail) + if !result.ok { + allOK = false + } + } + } + for _, rigName := range rigs { + if result, ok := refineryResults[rigName]; ok { + printStatus(result.name, result.ok, result.detail) + if !result.ok { + allOK = false + } } } - // 5. Refineries (one per rig) - for _, rigName := range rigs { - _, r, err := getRig(rigName) - if err != nil { - printStatus(fmt.Sprintf("Refinery (%s)", rigName), false, err.Error()) - allOK = false - continue - } - - mgr := refinery.NewManager(r) - if err := mgr.Start(false, ""); err != nil { - if err == refinery.ErrAlreadyRunning { - printStatus(fmt.Sprintf("Refinery (%s)", rigName), true, mgr.SessionName()) - } else { - printStatus(fmt.Sprintf("Refinery (%s)", rigName), false, err.Error()) - allOK = false - } - } else { - printStatus(fmt.Sprintf("Refinery (%s)", rigName), true, mgr.SessionName()) - } - } - - // 6. Crew (if --restore) + // 7. Crew (if --restore) if upRestore { for _, rigName := range rigs { crewStarted, crewErrors := startCrewFromSettings(townRoot, rigName) @@ -249,6 +280,182 @@ func ensureDaemon(townRoot string) error { return nil } +// rigPrefetchResult holds the result of loading a single rig config. +type rigPrefetchResult struct { + index int + rig *rig.Rig + err error +} + +// prefetchRigs loads all rig configs in parallel for faster agent startup. +// Returns a map of rig name to loaded Rig, and any errors encountered. +func prefetchRigs(rigNames []string) (map[string]*rig.Rig, map[string]error) { + n := len(rigNames) + if n == 0 { + return make(map[string]*rig.Rig), make(map[string]error) + } + + // Use channel to collect results without locking + results := make(chan rigPrefetchResult, n) + + for i, name := range rigNames { + go func(idx int, rigName string) { + _, r, err := getRig(rigName) + results <- rigPrefetchResult{index: idx, rig: r, err: err} + }(i, name) + } + + // Collect results - pre-allocate maps with capacity + rigs := make(map[string]*rig.Rig, n) + errors := make(map[string]error) + + for i := 0; i < n; i++ { + res := <-results + name := rigNames[res.index] + if res.err != nil { + errors[name] = res.err + } else { + rigs[name] = res.rig + } + } + + return rigs, errors +} + +// agentTask represents a unit of work for the agent worker pool. +type agentTask struct { + rigName string + rigObj *rig.Rig + isWitness bool // true for witness, false for refinery +} + +// agentResultMsg carries result back from worker to collector. +type agentResultMsg struct { + rigName string + isWitness bool + result agentStartResult +} + +// startRigAgentsParallel starts all Witnesses and Refineries concurrently. +// Discovers and prefetches rigs internally. For use when rigs aren't pre-loaded. +func startRigAgentsParallel(rigNames []string) (witnessResults, refineryResults map[string]agentStartResult) { + prefetchedRigs, rigErrors := prefetchRigs(rigNames) + return startRigAgentsWithPrefetch(rigNames, prefetchedRigs, rigErrors) +} + +// startRigAgentsWithPrefetch starts all Witnesses and Refineries using pre-loaded rig configs. +// Uses a worker pool with fixed goroutine count to limit concurrency and reduce overhead. +func startRigAgentsWithPrefetch(rigNames []string, prefetchedRigs map[string]*rig.Rig, rigErrors map[string]error) (witnessResults, refineryResults map[string]agentStartResult) { + n := len(rigNames) + witnessResults = make(map[string]agentStartResult, n) + refineryResults = make(map[string]agentStartResult, n) + + if n == 0 { + return + } + + // Record errors for rigs that failed to load + for rigName, err := range rigErrors { + errDetail := err.Error() + witnessResults[rigName] = agentStartResult{ + name: "Witness (" + rigName + ")", + ok: false, + detail: errDetail, + } + refineryResults[rigName] = agentStartResult{ + name: "Refinery (" + rigName + ")", + ok: false, + detail: errDetail, + } + } + + numTasks := len(prefetchedRigs) * 2 // witness + refinery per rig + if numTasks == 0 { + return + } + + // Task channel and result channel + tasks := make(chan agentTask, numTasks) + results := make(chan agentResultMsg, numTasks) + + // Start fixed worker pool (bounded by maxConcurrentAgentStarts) + numWorkers := maxConcurrentAgentStarts + if numTasks < numWorkers { + numWorkers = numTasks + } + + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for task := range tasks { + var result agentStartResult + if task.isWitness { + result = upStartWitness(task.rigName, task.rigObj) + } else { + result = upStartRefinery(task.rigName, task.rigObj) + } + results <- agentResultMsg{ + rigName: task.rigName, + isWitness: task.isWitness, + result: result, + } + } + }() + } + + // Enqueue all tasks + for rigName, r := range prefetchedRigs { + tasks <- agentTask{rigName: rigName, rigObj: r, isWitness: true} + tasks <- agentTask{rigName: rigName, rigObj: r, isWitness: false} + } + close(tasks) + + // Close results channel when workers are done + go func() { + wg.Wait() + close(results) + }() + + // Collect results - no locking needed, single goroutine collects + for msg := range results { + if msg.isWitness { + witnessResults[msg.rigName] = msg.result + } else { + refineryResults[msg.rigName] = msg.result + } + } + + return +} + +// upStartWitness starts a witness for the given rig and returns a result struct. +func upStartWitness(rigName string, r *rig.Rig) agentStartResult { + name := "Witness (" + rigName + ")" + mgr := witness.NewManager(r) + if err := mgr.Start(false, "", nil); err != nil { + if err == witness.ErrAlreadyRunning { + return agentStartResult{name: name, ok: true, detail: mgr.SessionName()} + } + return agentStartResult{name: name, ok: false, detail: err.Error()} + } + return agentStartResult{name: name, ok: true, detail: mgr.SessionName()} +} + +// upStartRefinery starts a refinery for the given rig and returns a result struct. +func upStartRefinery(rigName string, r *rig.Rig) agentStartResult { + name := "Refinery (" + rigName + ")" + mgr := refinery.NewManager(r) + if err := mgr.Start(false, ""); err != nil { + if err == refinery.ErrAlreadyRunning { + return agentStartResult{name: name, ok: true, detail: mgr.SessionName()} + } + return agentStartResult{name: name, ok: false, detail: err.Error()} + } + return agentStartResult{name: name, ok: true, detail: mgr.SessionName()} +} + // discoverRigs finds all rigs in the town. func discoverRigs(townRoot string) []string { var rigs []string diff --git a/internal/cmd/up_test.go b/internal/cmd/up_test.go new file mode 100644 index 00000000..5d8ef51b --- /dev/null +++ b/internal/cmd/up_test.go @@ -0,0 +1,216 @@ +package cmd + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/steveyegge/gastown/internal/rig" +) + +func TestAgentStartResult_Fields(t *testing.T) { + result := agentStartResult{ + name: "Witness (gastown)", + ok: true, + detail: "gt-gastown-witness", + } + + if result.name != "Witness (gastown)" { + t.Errorf("name = %q, want %q", result.name, "Witness (gastown)") + } + if !result.ok { + t.Error("ok should be true") + } + if result.detail != "gt-gastown-witness" { + t.Errorf("detail = %q, want %q", result.detail, "gt-gastown-witness") + } +} + +func TestMaxConcurrentAgentStarts_Constant(t *testing.T) { + // Verify the constant is set to a reasonable value + if maxConcurrentAgentStarts < 1 { + t.Errorf("maxConcurrentAgentStarts = %d, should be >= 1", maxConcurrentAgentStarts) + } + if maxConcurrentAgentStarts > 100 { + t.Errorf("maxConcurrentAgentStarts = %d, should be <= 100 to prevent resource exhaustion", maxConcurrentAgentStarts) + } +} + +func TestSemaphoreLimitsConcurrency(t *testing.T) { + // Test that a semaphore pattern properly limits concurrency + const maxConcurrent = 3 + const totalTasks = 10 + + sem := make(chan struct{}, maxConcurrent) + var wg sync.WaitGroup + var maxObserved int32 + var current int32 + + for i := 0; i < totalTasks; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + // Acquire semaphore + sem <- struct{}{} + defer func() { <-sem }() + + // Track concurrent count + cur := atomic.AddInt32(¤t, 1) + defer atomic.AddInt32(¤t, -1) + + // Update max observed + for { + max := atomic.LoadInt32(&maxObserved) + if cur <= max || atomic.CompareAndSwapInt32(&maxObserved, max, cur) { + break + } + } + + // Simulate work + time.Sleep(10 * time.Millisecond) + }() + } + + wg.Wait() + + if maxObserved > maxConcurrent { + t.Errorf("max concurrent = %d, should not exceed %d", maxObserved, maxConcurrent) + } +} + +func TestStartRigAgentsParallel_EmptyRigs(t *testing.T) { + // Test with empty rig list - should return empty maps without error + witnessResults, refineryResults := startRigAgentsParallel([]string{}) + + if len(witnessResults) != 0 { + t.Errorf("witnessResults should be empty, got %d entries", len(witnessResults)) + } + if len(refineryResults) != 0 { + t.Errorf("refineryResults should be empty, got %d entries", len(refineryResults)) + } +} + +func TestStartRigAgentsWithPrefetch_EmptyRigs(t *testing.T) { + // Test with empty inputs + witnessResults, refineryResults := startRigAgentsWithPrefetch( + []string{}, + make(map[string]*rig.Rig), + make(map[string]error), + ) + + if len(witnessResults) != 0 { + t.Errorf("witnessResults should be empty, got %d entries", len(witnessResults)) + } + if len(refineryResults) != 0 { + t.Errorf("refineryResults should be empty, got %d entries", len(refineryResults)) + } +} + +func TestStartRigAgentsWithPrefetch_RecordsErrors(t *testing.T) { + // Test that rig errors are properly recorded + rigErrors := map[string]error{ + "badrig": fmt.Errorf("rig not found"), + } + + witnessResults, refineryResults := startRigAgentsWithPrefetch( + []string{"badrig"}, + make(map[string]*rig.Rig), + rigErrors, + ) + + if len(witnessResults) != 1 { + t.Errorf("witnessResults should have 1 entry, got %d", len(witnessResults)) + } + if result, ok := witnessResults["badrig"]; !ok { + t.Error("witnessResults should have badrig entry") + } else if result.ok { + t.Error("badrig witness result should not be ok") + } + + if len(refineryResults) != 1 { + t.Errorf("refineryResults should have 1 entry, got %d", len(refineryResults)) + } + if result, ok := refineryResults["badrig"]; !ok { + t.Error("refineryResults should have badrig entry") + } else if result.ok { + t.Error("badrig refinery result should not be ok") + } +} + +func TestPrefetchRigs_Empty(t *testing.T) { + // Test with empty rig list + rigs, errors := prefetchRigs([]string{}) + + if len(rigs) != 0 { + t.Errorf("rigs should be empty, got %d entries", len(rigs)) + } + if len(errors) != 0 { + t.Errorf("errors should be empty, got %d entries", len(errors)) + } +} + +func TestWorkerPoolLimitsConcurrency(t *testing.T) { + // Test that a worker pool pattern properly limits concurrency + const numWorkers = 3 + const numTasks = 15 + + tasks := make(chan int, numTasks) + results := make(chan int, numTasks) + + var maxObserved int32 + var current int32 + + // Start worker pool + var wg sync.WaitGroup + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for range tasks { + // Track concurrent count + cur := atomic.AddInt32(¤t, 1) + + // Update max observed + for { + max := atomic.LoadInt32(&maxObserved) + if cur <= max || atomic.CompareAndSwapInt32(&maxObserved, max, cur) { + break + } + } + + // Simulate work + time.Sleep(5 * time.Millisecond) + + atomic.AddInt32(¤t, -1) + results <- 1 + } + }() + } + + // Enqueue tasks + for i := 0; i < numTasks; i++ { + tasks <- i + } + close(tasks) + + // Wait for workers and collect results + go func() { + wg.Wait() + close(results) + }() + + count := 0 + for range results { + count++ + } + + if count != numTasks { + t.Errorf("expected %d results, got %d", numTasks, count) + } + if maxObserved > numWorkers { + t.Errorf("max concurrent = %d, should not exceed %d workers", maxObserved, numWorkers) + } +}