From 069fe0f2856beec445ac87481d73d44d10ecd289 Mon Sep 17 00:00:00 2001 From: jack Date: Mon, 12 Jan 2026 16:38:39 -0800 Subject: [PATCH] feat(start): parallelize agent startup for faster boot Start Mayor, Deacon, rig agents, and crew all in parallel rather than sequentially. This reduces worst-case startup from N*60s to ~60s since all agents can start concurrently. Closes gt-dgbwk Co-Authored-By: Claude Opus 4.5 --- internal/cmd/start.go | 380 +++++++++++++++++++++++++++++++----------- 1 file changed, 280 insertions(+), 100 deletions(-) diff --git a/internal/cmd/start.go b/internal/cmd/start.go index 64270039..2f5c6a29 100644 --- a/internal/cmd/start.go +++ b/internal/cmd/start.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/spf13/cobra" @@ -165,23 +166,46 @@ func runStart(cmd *cobra.Command, args []string) error { t := tmux.NewTmux() fmt.Printf("Starting Gas Town from %s\n\n", style.Dim.Render(townRoot)) - - // Start core agents (Mayor and Deacon) - if err := startCoreAgents(townRoot, startAgentOverride); err != nil { - return err - } - - // If --all, start witnesses and refineries for all rigs - if startAll { - fmt.Println() - fmt.Println("Starting rig agents...") - startRigAgents(t, townRoot) - } - - // Auto-start configured crew for each rig + fmt.Println("Starting all agents in parallel...") fmt.Println() - fmt.Println("Starting configured crew...") - startConfiguredCrew(t, townRoot) + + // Start all agent groups in parallel for maximum speed + var wg sync.WaitGroup + var mu sync.Mutex // Protects stdout + var coreErr error + + // Start core agents (Mayor and Deacon) in background + wg.Add(1) + go func() { + defer wg.Done() + if err := startCoreAgentsParallel(townRoot, startAgentOverride, &mu); err != nil { + mu.Lock() + coreErr = err + mu.Unlock() + } + }() + + // Start rig agents (witnesses, refineries) if --all + if startAll { + wg.Add(1) + go func() { + defer wg.Done() + startRigAgentsParallel(t, townRoot, &mu) + }() + } + + // Start configured crew + wg.Add(1) + go func() { + defer wg.Done() + startConfiguredCrewParallel(t, townRoot, &mu) + }() + + wg.Wait() + + if coreErr != nil { + return coreErr + } fmt.Println() fmt.Printf("%s Gas Town is running\n", style.Bold.Render("✓")) @@ -193,36 +217,72 @@ func runStart(cmd *cobra.Command, args []string) error { return nil } -// startCoreAgents starts Mayor and Deacon sessions using the Manager pattern. -func startCoreAgents(townRoot string, agentOverride string) error { - // Start Mayor first (so Deacon sees it as up) - mayorMgr := mayor.NewManager(townRoot) - if err := mayorMgr.Start(agentOverride); err != nil { - if err == mayor.ErrAlreadyRunning { - fmt.Printf(" %s Mayor already running\n", style.Dim.Render("○")) - } else { - return fmt.Errorf("starting Mayor: %w", err) - } - } else { - fmt.Printf(" %s Mayor started\n", style.Bold.Render("✓")) - } +// startCoreAgentsParallel starts Mayor and Deacon sessions in parallel using the Manager pattern. +// The mutex is used to synchronize output with other parallel startup operations. +func startCoreAgentsParallel(townRoot string, agentOverride string, mu *sync.Mutex) error { + var wg sync.WaitGroup + var firstErr error + var errMu sync.Mutex - // Start Deacon (health monitor) - deaconMgr := deacon.NewManager(townRoot) - if err := deaconMgr.Start(agentOverride); err != nil { - if err == deacon.ErrAlreadyRunning { - fmt.Printf(" %s Deacon already running\n", style.Dim.Render("○")) + // Start Mayor in goroutine + wg.Add(1) + go func() { + defer wg.Done() + mayorMgr := mayor.NewManager(townRoot) + if err := mayorMgr.Start(agentOverride); err != nil { + if err == mayor.ErrAlreadyRunning { + mu.Lock() + fmt.Printf(" %s Mayor already running\n", style.Dim.Render("○")) + mu.Unlock() + } else { + errMu.Lock() + if firstErr == nil { + firstErr = fmt.Errorf("starting Mayor: %w", err) + } + errMu.Unlock() + mu.Lock() + fmt.Printf(" %s Mayor failed: %v\n", style.Dim.Render("○"), err) + mu.Unlock() + } } else { - return fmt.Errorf("starting Deacon: %w", err) + mu.Lock() + fmt.Printf(" %s Mayor started\n", style.Bold.Render("✓")) + mu.Unlock() } - } else { - fmt.Printf(" %s Deacon started\n", style.Bold.Render("✓")) - } + }() - return nil + // Start Deacon in goroutine + wg.Add(1) + go func() { + defer wg.Done() + deaconMgr := deacon.NewManager(townRoot) + if err := deaconMgr.Start(agentOverride); err != nil { + if err == deacon.ErrAlreadyRunning { + mu.Lock() + fmt.Printf(" %s Deacon already running\n", style.Dim.Render("○")) + mu.Unlock() + } else { + errMu.Lock() + if firstErr == nil { + firstErr = fmt.Errorf("starting Deacon: %w", err) + } + errMu.Unlock() + mu.Lock() + fmt.Printf(" %s Deacon failed: %v\n", style.Dim.Render("○"), err) + mu.Unlock() + } + } else { + mu.Lock() + fmt.Printf(" %s Deacon started\n", style.Bold.Render("✓")) + mu.Unlock() + } + }() + + wg.Wait() + return firstErr } -// startRigAgents starts witness and refinery for all rigs. +// startRigAgents starts witness and refinery for all rigs in parallel. // Called when --all flag is passed to gt start. func startRigAgents(t *tmux.Tmux, townRoot string) { rigs, err := discoverAllRigs(townRoot) @@ -231,40 +291,65 @@ func startRigAgents(t *tmux.Tmux, townRoot string) { return } - for _, r := range rigs { - // Start Witness - witnessSession := fmt.Sprintf("gt-%s-witness", r.Name) - witnessRunning, _ := t.HasSession(witnessSession) - if witnessRunning { - fmt.Printf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name) - } else { - witMgr := witness.NewManager(r) - if err := witMgr.Start(false, "", nil); err != nil { - if err == witness.ErrAlreadyRunning { - fmt.Printf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name) - } else { - fmt.Printf(" %s %s witness failed: %v\n", style.Dim.Render("○"), r.Name, err) - } - } else { - fmt.Printf(" %s %s witness started\n", style.Bold.Render("✓"), r.Name) - } - } + var wg sync.WaitGroup + var mu sync.Mutex // Protects stdout - // Start Refinery - refineryMgr := refinery.NewManager(r) - if err := refineryMgr.Start(false); err != nil { - if errors.Is(err, refinery.ErrAlreadyRunning) { - fmt.Printf(" %s %s refinery already running\n", style.Dim.Render("○"), r.Name) - } else { - fmt.Printf(" %s %s refinery failed: %v\n", style.Dim.Render("○"), r.Name, err) - } - } else { - fmt.Printf(" %s %s refinery started\n", style.Bold.Render("✓"), r.Name) - } + for _, r := range rigs { + wg.Add(2) // Witness + Refinery + + // Start Witness in goroutine + go func(r *rig.Rig) { + defer wg.Done() + msg := startWitnessForRig(t, r) + mu.Lock() + fmt.Print(msg) + mu.Unlock() + }(r) + + // Start Refinery in goroutine + go func(r *rig.Rig) { + defer wg.Done() + msg := startRefineryForRig(r) + mu.Lock() + fmt.Print(msg) + mu.Unlock() + }(r) } + + wg.Wait() } -// startConfiguredCrew starts crew members configured in rig settings. +// startWitnessForRig starts the witness for a single rig and returns a status message. +func startWitnessForRig(t *tmux.Tmux, r *rig.Rig) string { + witnessSession := fmt.Sprintf("gt-%s-witness", r.Name) + witnessRunning, _ := t.HasSession(witnessSession) + if witnessRunning { + return fmt.Sprintf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name) + } + + witMgr := witness.NewManager(r) + if err := witMgr.Start(false, "", nil); err != nil { + if err == witness.ErrAlreadyRunning { + return fmt.Sprintf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name) + } + return fmt.Sprintf(" %s %s witness failed: %v\n", style.Dim.Render("○"), r.Name, err) + } + return fmt.Sprintf(" %s %s witness started\n", style.Bold.Render("✓"), r.Name) +} + +// startRefineryForRig starts the refinery for a single rig and returns a status message. +func startRefineryForRig(r *rig.Rig) string { + refineryMgr := refinery.NewManager(r) + if err := refineryMgr.Start(false); err != nil { + if errors.Is(err, refinery.ErrAlreadyRunning) { + return fmt.Sprintf(" %s %s refinery already running\n", style.Dim.Render("○"), r.Name) + } + return fmt.Sprintf(" %s %s refinery failed: %v\n", style.Dim.Render("○"), r.Name, err) + } + return fmt.Sprintf(" %s %s refinery started\n", style.Bold.Render("✓"), r.Name) +} + +// startConfiguredCrew starts crew members configured in rig settings in parallel. func startConfiguredCrew(t *tmux.Tmux, townRoot string) { rigs, err := discoverAllRigs(townRoot) if err != nil { @@ -272,50 +357,145 @@ func startConfiguredCrew(t *tmux.Tmux, townRoot string) { return } + var wg sync.WaitGroup + var mu sync.Mutex // Protects stdout and startedAny startedAny := false + for _, r := range rigs { crewToStart := getCrewToStart(r) for _, crewName := range crewToStart { - sessionID := crewSessionName(r.Name, crewName) - if running, _ := t.HasSession(sessionID); running { - // Session exists - check if Claude is still running - agentCfg := config.ResolveAgentConfig(townRoot, r.Path) - if !t.IsAgentRunning(sessionID, config.ExpectedPaneCommands(agentCfg)...) { - // Claude has exited, restart it - fmt.Printf(" %s %s/%s session exists, restarting Claude...\n", style.Dim.Render("○"), r.Name, crewName) - // Build startup beacon for predecessor discovery via /resume - address := fmt.Sprintf("%s/crew/%s", r.Name, crewName) - beacon := session.FormatStartupNudge(session.StartupNudgeConfig{ - Recipient: address, - Sender: "human", - Topic: "restart", - }) - claudeCmd := config.BuildCrewStartupCommand(r.Name, crewName, r.Path, beacon) - if err := t.SendKeys(sessionID, claudeCmd); err != nil { - fmt.Printf(" %s %s/%s restart failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err) - } else { - fmt.Printf(" %s %s/%s Claude restarted\n", style.Bold.Render("✓"), r.Name, crewName) - startedAny = true - } - } else { - fmt.Printf(" %s %s/%s already running\n", style.Dim.Render("○"), r.Name, crewName) - } - } else { - if err := startCrewMember(r.Name, crewName, townRoot); err != nil { - fmt.Printf(" %s %s/%s failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err) - } else { - fmt.Printf(" %s %s/%s started\n", style.Bold.Render("✓"), r.Name, crewName) + wg.Add(1) + go func(r *rig.Rig, crewName string) { + defer wg.Done() + msg, started := startOrRestartCrewMember(t, r, crewName, townRoot) + mu.Lock() + fmt.Print(msg) + if started { startedAny = true } - } + mu.Unlock() + }(r, crewName) } } + wg.Wait() + if !startedAny { fmt.Printf(" %s No crew configured or all already running\n", style.Dim.Render("○")) } } +// startRigAgentsParallel starts witness and refinery for all rigs in parallel. +// Uses the provided mutex for synchronized output with other parallel operations. +func startRigAgentsParallel(t *tmux.Tmux, townRoot string, mu *sync.Mutex) { + rigs, err := discoverAllRigs(townRoot) + if err != nil { + mu.Lock() + fmt.Printf(" %s Could not discover rigs: %v\n", style.Dim.Render("○"), err) + mu.Unlock() + return + } + + var wg sync.WaitGroup + + for _, r := range rigs { + wg.Add(2) // Witness + Refinery + + // Start Witness in goroutine + go func(r *rig.Rig) { + defer wg.Done() + msg := startWitnessForRig(t, r) + mu.Lock() + fmt.Print(msg) + mu.Unlock() + }(r) + + // Start Refinery in goroutine + go func(r *rig.Rig) { + defer wg.Done() + msg := startRefineryForRig(r) + mu.Lock() + fmt.Print(msg) + mu.Unlock() + }(r) + } + + wg.Wait() +} + +// startConfiguredCrewParallel starts crew members configured in rig settings in parallel. +// Uses the provided mutex for synchronized output with other parallel operations. +func startConfiguredCrewParallel(t *tmux.Tmux, townRoot string, mu *sync.Mutex) { + rigs, err := discoverAllRigs(townRoot) + if err != nil { + mu.Lock() + fmt.Printf(" %s Could not discover rigs: %v\n", style.Dim.Render("○"), err) + mu.Unlock() + return + } + + var wg sync.WaitGroup + startedAny := false + var startedMu sync.Mutex // Protects startedAny + + for _, r := range rigs { + crewToStart := getCrewToStart(r) + for _, crewName := range crewToStart { + wg.Add(1) + go func(r *rig.Rig, crewName string) { + defer wg.Done() + msg, started := startOrRestartCrewMember(t, r, crewName, townRoot) + mu.Lock() + fmt.Print(msg) + mu.Unlock() + if started { + startedMu.Lock() + startedAny = true + startedMu.Unlock() + } + }(r, crewName) + } + } + + wg.Wait() + + if !startedAny { + mu.Lock() + fmt.Printf(" %s No crew configured or all already running\n", style.Dim.Render("○")) + mu.Unlock() + } +} + +// startOrRestartCrewMember starts or restarts a single crew member and returns a status message. +func startOrRestartCrewMember(t *tmux.Tmux, r *rig.Rig, crewName, townRoot string) (msg string, started bool) { + sessionID := crewSessionName(r.Name, crewName) + if running, _ := t.HasSession(sessionID); running { + // Session exists - check if Claude is still running + agentCfg := config.ResolveAgentConfig(townRoot, r.Path) + if !t.IsAgentRunning(sessionID, config.ExpectedPaneCommands(agentCfg)...) { + // Claude has exited, restart it + // Build startup beacon for predecessor discovery via /resume + address := fmt.Sprintf("%s/crew/%s", r.Name, crewName) + beacon := session.FormatStartupNudge(session.StartupNudgeConfig{ + Recipient: address, + Sender: "human", + Topic: "restart", + }) + claudeCmd := config.BuildCrewStartupCommand(r.Name, crewName, r.Path, beacon) + if err := t.SendKeys(sessionID, claudeCmd); err != nil { + return fmt.Sprintf(" %s %s/%s restart failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err), false + } + return fmt.Sprintf(" %s %s/%s Claude restarted\n", style.Bold.Render("✓"), r.Name, crewName), true + } + return fmt.Sprintf(" %s %s/%s already running\n", style.Dim.Render("○"), r.Name, crewName), false + } + + if err := startCrewMember(r.Name, crewName, townRoot); err != nil { + return fmt.Sprintf(" %s %s/%s failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err), false + } + return fmt.Sprintf(" %s %s/%s started\n", style.Bold.Render("✓"), r.Name, crewName), true +} + // discoverAllRigs finds all rigs in the workspace. func discoverAllRigs(townRoot string) ([]*rig.Rig, error) { rigsConfigPath := filepath.Join(townRoot, "mayor", "rigs.json")