feat(start): parallelize agent startup for faster boot
Start Mayor, Deacon, rig agents, and crew all in parallel rather than sequentially. This reduces worst-case startup from N*60s to ~60s since all agents can start concurrently. Closes gt-dgbwk Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@@ -165,23 +166,46 @@ func runStart(cmd *cobra.Command, args []string) error {
|
||||
t := tmux.NewTmux()
|
||||
|
||||
fmt.Printf("Starting Gas Town from %s\n\n", style.Dim.Render(townRoot))
|
||||
|
||||
// Start core agents (Mayor and Deacon)
|
||||
if err := startCoreAgents(townRoot, startAgentOverride); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If --all, start witnesses and refineries for all rigs
|
||||
if startAll {
|
||||
fmt.Println()
|
||||
fmt.Println("Starting rig agents...")
|
||||
startRigAgents(t, townRoot)
|
||||
}
|
||||
|
||||
// Auto-start configured crew for each rig
|
||||
fmt.Println("Starting all agents in parallel...")
|
||||
fmt.Println()
|
||||
fmt.Println("Starting configured crew...")
|
||||
startConfiguredCrew(t, townRoot)
|
||||
|
||||
// Start all agent groups in parallel for maximum speed
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex // Protects stdout
|
||||
var coreErr error
|
||||
|
||||
// Start core agents (Mayor and Deacon) in background
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := startCoreAgentsParallel(townRoot, startAgentOverride, &mu); err != nil {
|
||||
mu.Lock()
|
||||
coreErr = err
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
// Start rig agents (witnesses, refineries) if --all
|
||||
if startAll {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
startRigAgentsParallel(t, townRoot, &mu)
|
||||
}()
|
||||
}
|
||||
|
||||
// Start configured crew
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
startConfiguredCrewParallel(t, townRoot, &mu)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if coreErr != nil {
|
||||
return coreErr
|
||||
}
|
||||
|
||||
fmt.Println()
|
||||
fmt.Printf("%s Gas Town is running\n", style.Bold.Render("✓"))
|
||||
@@ -193,36 +217,72 @@ func runStart(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// startCoreAgents starts Mayor and Deacon sessions using the Manager pattern.
|
||||
func startCoreAgents(townRoot string, agentOverride string) error {
|
||||
// Start Mayor first (so Deacon sees it as up)
|
||||
mayorMgr := mayor.NewManager(townRoot)
|
||||
if err := mayorMgr.Start(agentOverride); err != nil {
|
||||
if err == mayor.ErrAlreadyRunning {
|
||||
fmt.Printf(" %s Mayor already running\n", style.Dim.Render("○"))
|
||||
} else {
|
||||
return fmt.Errorf("starting Mayor: %w", err)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" %s Mayor started\n", style.Bold.Render("✓"))
|
||||
}
|
||||
// startCoreAgentsParallel starts Mayor and Deacon sessions in parallel using the Manager pattern.
|
||||
// The mutex is used to synchronize output with other parallel startup operations.
|
||||
func startCoreAgentsParallel(townRoot string, agentOverride string, mu *sync.Mutex) error {
|
||||
var wg sync.WaitGroup
|
||||
var firstErr error
|
||||
var errMu sync.Mutex
|
||||
|
||||
// Start Deacon (health monitor)
|
||||
deaconMgr := deacon.NewManager(townRoot)
|
||||
if err := deaconMgr.Start(agentOverride); err != nil {
|
||||
if err == deacon.ErrAlreadyRunning {
|
||||
fmt.Printf(" %s Deacon already running\n", style.Dim.Render("○"))
|
||||
// Start Mayor in goroutine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
mayorMgr := mayor.NewManager(townRoot)
|
||||
if err := mayorMgr.Start(agentOverride); err != nil {
|
||||
if err == mayor.ErrAlreadyRunning {
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Mayor already running\n", style.Dim.Render("○"))
|
||||
mu.Unlock()
|
||||
} else {
|
||||
errMu.Lock()
|
||||
if firstErr == nil {
|
||||
firstErr = fmt.Errorf("starting Mayor: %w", err)
|
||||
}
|
||||
errMu.Unlock()
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Mayor failed: %v\n", style.Dim.Render("○"), err)
|
||||
mu.Unlock()
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("starting Deacon: %w", err)
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Mayor started\n", style.Bold.Render("✓"))
|
||||
mu.Unlock()
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" %s Deacon started\n", style.Bold.Render("✓"))
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
// Start Deacon in goroutine
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
deaconMgr := deacon.NewManager(townRoot)
|
||||
if err := deaconMgr.Start(agentOverride); err != nil {
|
||||
if err == deacon.ErrAlreadyRunning {
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Deacon already running\n", style.Dim.Render("○"))
|
||||
mu.Unlock()
|
||||
} else {
|
||||
errMu.Lock()
|
||||
if firstErr == nil {
|
||||
firstErr = fmt.Errorf("starting Deacon: %w", err)
|
||||
}
|
||||
errMu.Unlock()
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Deacon failed: %v\n", style.Dim.Render("○"), err)
|
||||
mu.Unlock()
|
||||
}
|
||||
} else {
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Deacon started\n", style.Bold.Render("✓"))
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
return firstErr
|
||||
}
|
||||
|
||||
// startRigAgents starts witness and refinery for all rigs.
|
||||
// startRigAgents starts witness and refinery for all rigs in parallel.
|
||||
// Called when --all flag is passed to gt start.
|
||||
func startRigAgents(t *tmux.Tmux, townRoot string) {
|
||||
rigs, err := discoverAllRigs(townRoot)
|
||||
@@ -231,40 +291,65 @@ func startRigAgents(t *tmux.Tmux, townRoot string) {
|
||||
return
|
||||
}
|
||||
|
||||
for _, r := range rigs {
|
||||
// Start Witness
|
||||
witnessSession := fmt.Sprintf("gt-%s-witness", r.Name)
|
||||
witnessRunning, _ := t.HasSession(witnessSession)
|
||||
if witnessRunning {
|
||||
fmt.Printf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name)
|
||||
} else {
|
||||
witMgr := witness.NewManager(r)
|
||||
if err := witMgr.Start(false, "", nil); err != nil {
|
||||
if err == witness.ErrAlreadyRunning {
|
||||
fmt.Printf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name)
|
||||
} else {
|
||||
fmt.Printf(" %s %s witness failed: %v\n", style.Dim.Render("○"), r.Name, err)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" %s %s witness started\n", style.Bold.Render("✓"), r.Name)
|
||||
}
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex // Protects stdout
|
||||
|
||||
// Start Refinery
|
||||
refineryMgr := refinery.NewManager(r)
|
||||
if err := refineryMgr.Start(false); err != nil {
|
||||
if errors.Is(err, refinery.ErrAlreadyRunning) {
|
||||
fmt.Printf(" %s %s refinery already running\n", style.Dim.Render("○"), r.Name)
|
||||
} else {
|
||||
fmt.Printf(" %s %s refinery failed: %v\n", style.Dim.Render("○"), r.Name, err)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" %s %s refinery started\n", style.Bold.Render("✓"), r.Name)
|
||||
}
|
||||
for _, r := range rigs {
|
||||
wg.Add(2) // Witness + Refinery
|
||||
|
||||
// Start Witness in goroutine
|
||||
go func(r *rig.Rig) {
|
||||
defer wg.Done()
|
||||
msg := startWitnessForRig(t, r)
|
||||
mu.Lock()
|
||||
fmt.Print(msg)
|
||||
mu.Unlock()
|
||||
}(r)
|
||||
|
||||
// Start Refinery in goroutine
|
||||
go func(r *rig.Rig) {
|
||||
defer wg.Done()
|
||||
msg := startRefineryForRig(r)
|
||||
mu.Lock()
|
||||
fmt.Print(msg)
|
||||
mu.Unlock()
|
||||
}(r)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// startConfiguredCrew starts crew members configured in rig settings.
|
||||
// startWitnessForRig starts the witness for a single rig and returns a status message.
|
||||
func startWitnessForRig(t *tmux.Tmux, r *rig.Rig) string {
|
||||
witnessSession := fmt.Sprintf("gt-%s-witness", r.Name)
|
||||
witnessRunning, _ := t.HasSession(witnessSession)
|
||||
if witnessRunning {
|
||||
return fmt.Sprintf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name)
|
||||
}
|
||||
|
||||
witMgr := witness.NewManager(r)
|
||||
if err := witMgr.Start(false, "", nil); err != nil {
|
||||
if err == witness.ErrAlreadyRunning {
|
||||
return fmt.Sprintf(" %s %s witness already running\n", style.Dim.Render("○"), r.Name)
|
||||
}
|
||||
return fmt.Sprintf(" %s %s witness failed: %v\n", style.Dim.Render("○"), r.Name, err)
|
||||
}
|
||||
return fmt.Sprintf(" %s %s witness started\n", style.Bold.Render("✓"), r.Name)
|
||||
}
|
||||
|
||||
// startRefineryForRig starts the refinery for a single rig and returns a status message.
|
||||
func startRefineryForRig(r *rig.Rig) string {
|
||||
refineryMgr := refinery.NewManager(r)
|
||||
if err := refineryMgr.Start(false); err != nil {
|
||||
if errors.Is(err, refinery.ErrAlreadyRunning) {
|
||||
return fmt.Sprintf(" %s %s refinery already running\n", style.Dim.Render("○"), r.Name)
|
||||
}
|
||||
return fmt.Sprintf(" %s %s refinery failed: %v\n", style.Dim.Render("○"), r.Name, err)
|
||||
}
|
||||
return fmt.Sprintf(" %s %s refinery started\n", style.Bold.Render("✓"), r.Name)
|
||||
}
|
||||
|
||||
// startConfiguredCrew starts crew members configured in rig settings in parallel.
|
||||
func startConfiguredCrew(t *tmux.Tmux, townRoot string) {
|
||||
rigs, err := discoverAllRigs(townRoot)
|
||||
if err != nil {
|
||||
@@ -272,50 +357,145 @@ func startConfiguredCrew(t *tmux.Tmux, townRoot string) {
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex // Protects stdout and startedAny
|
||||
startedAny := false
|
||||
|
||||
for _, r := range rigs {
|
||||
crewToStart := getCrewToStart(r)
|
||||
for _, crewName := range crewToStart {
|
||||
sessionID := crewSessionName(r.Name, crewName)
|
||||
if running, _ := t.HasSession(sessionID); running {
|
||||
// Session exists - check if Claude is still running
|
||||
agentCfg := config.ResolveAgentConfig(townRoot, r.Path)
|
||||
if !t.IsAgentRunning(sessionID, config.ExpectedPaneCommands(agentCfg)...) {
|
||||
// Claude has exited, restart it
|
||||
fmt.Printf(" %s %s/%s session exists, restarting Claude...\n", style.Dim.Render("○"), r.Name, crewName)
|
||||
// Build startup beacon for predecessor discovery via /resume
|
||||
address := fmt.Sprintf("%s/crew/%s", r.Name, crewName)
|
||||
beacon := session.FormatStartupNudge(session.StartupNudgeConfig{
|
||||
Recipient: address,
|
||||
Sender: "human",
|
||||
Topic: "restart",
|
||||
})
|
||||
claudeCmd := config.BuildCrewStartupCommand(r.Name, crewName, r.Path, beacon)
|
||||
if err := t.SendKeys(sessionID, claudeCmd); err != nil {
|
||||
fmt.Printf(" %s %s/%s restart failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err)
|
||||
} else {
|
||||
fmt.Printf(" %s %s/%s Claude restarted\n", style.Bold.Render("✓"), r.Name, crewName)
|
||||
startedAny = true
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" %s %s/%s already running\n", style.Dim.Render("○"), r.Name, crewName)
|
||||
}
|
||||
} else {
|
||||
if err := startCrewMember(r.Name, crewName, townRoot); err != nil {
|
||||
fmt.Printf(" %s %s/%s failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err)
|
||||
} else {
|
||||
fmt.Printf(" %s %s/%s started\n", style.Bold.Render("✓"), r.Name, crewName)
|
||||
wg.Add(1)
|
||||
go func(r *rig.Rig, crewName string) {
|
||||
defer wg.Done()
|
||||
msg, started := startOrRestartCrewMember(t, r, crewName, townRoot)
|
||||
mu.Lock()
|
||||
fmt.Print(msg)
|
||||
if started {
|
||||
startedAny = true
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
}(r, crewName)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if !startedAny {
|
||||
fmt.Printf(" %s No crew configured or all already running\n", style.Dim.Render("○"))
|
||||
}
|
||||
}
|
||||
|
||||
// startRigAgentsParallel starts witness and refinery for all rigs in parallel.
|
||||
// Uses the provided mutex for synchronized output with other parallel operations.
|
||||
func startRigAgentsParallel(t *tmux.Tmux, townRoot string, mu *sync.Mutex) {
|
||||
rigs, err := discoverAllRigs(townRoot)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Could not discover rigs: %v\n", style.Dim.Render("○"), err)
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, r := range rigs {
|
||||
wg.Add(2) // Witness + Refinery
|
||||
|
||||
// Start Witness in goroutine
|
||||
go func(r *rig.Rig) {
|
||||
defer wg.Done()
|
||||
msg := startWitnessForRig(t, r)
|
||||
mu.Lock()
|
||||
fmt.Print(msg)
|
||||
mu.Unlock()
|
||||
}(r)
|
||||
|
||||
// Start Refinery in goroutine
|
||||
go func(r *rig.Rig) {
|
||||
defer wg.Done()
|
||||
msg := startRefineryForRig(r)
|
||||
mu.Lock()
|
||||
fmt.Print(msg)
|
||||
mu.Unlock()
|
||||
}(r)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// startConfiguredCrewParallel starts crew members configured in rig settings in parallel.
|
||||
// Uses the provided mutex for synchronized output with other parallel operations.
|
||||
func startConfiguredCrewParallel(t *tmux.Tmux, townRoot string, mu *sync.Mutex) {
|
||||
rigs, err := discoverAllRigs(townRoot)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s Could not discover rigs: %v\n", style.Dim.Render("○"), err)
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
startedAny := false
|
||||
var startedMu sync.Mutex // Protects startedAny
|
||||
|
||||
for _, r := range rigs {
|
||||
crewToStart := getCrewToStart(r)
|
||||
for _, crewName := range crewToStart {
|
||||
wg.Add(1)
|
||||
go func(r *rig.Rig, crewName string) {
|
||||
defer wg.Done()
|
||||
msg, started := startOrRestartCrewMember(t, r, crewName, townRoot)
|
||||
mu.Lock()
|
||||
fmt.Print(msg)
|
||||
mu.Unlock()
|
||||
if started {
|
||||
startedMu.Lock()
|
||||
startedAny = true
|
||||
startedMu.Unlock()
|
||||
}
|
||||
}(r, crewName)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if !startedAny {
|
||||
mu.Lock()
|
||||
fmt.Printf(" %s No crew configured or all already running\n", style.Dim.Render("○"))
|
||||
mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// startOrRestartCrewMember starts or restarts a single crew member and returns a status message.
|
||||
func startOrRestartCrewMember(t *tmux.Tmux, r *rig.Rig, crewName, townRoot string) (msg string, started bool) {
|
||||
sessionID := crewSessionName(r.Name, crewName)
|
||||
if running, _ := t.HasSession(sessionID); running {
|
||||
// Session exists - check if Claude is still running
|
||||
agentCfg := config.ResolveAgentConfig(townRoot, r.Path)
|
||||
if !t.IsAgentRunning(sessionID, config.ExpectedPaneCommands(agentCfg)...) {
|
||||
// Claude has exited, restart it
|
||||
// Build startup beacon for predecessor discovery via /resume
|
||||
address := fmt.Sprintf("%s/crew/%s", r.Name, crewName)
|
||||
beacon := session.FormatStartupNudge(session.StartupNudgeConfig{
|
||||
Recipient: address,
|
||||
Sender: "human",
|
||||
Topic: "restart",
|
||||
})
|
||||
claudeCmd := config.BuildCrewStartupCommand(r.Name, crewName, r.Path, beacon)
|
||||
if err := t.SendKeys(sessionID, claudeCmd); err != nil {
|
||||
return fmt.Sprintf(" %s %s/%s restart failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err), false
|
||||
}
|
||||
return fmt.Sprintf(" %s %s/%s Claude restarted\n", style.Bold.Render("✓"), r.Name, crewName), true
|
||||
}
|
||||
return fmt.Sprintf(" %s %s/%s already running\n", style.Dim.Render("○"), r.Name, crewName), false
|
||||
}
|
||||
|
||||
if err := startCrewMember(r.Name, crewName, townRoot); err != nil {
|
||||
return fmt.Sprintf(" %s %s/%s failed: %v\n", style.Dim.Render("○"), r.Name, crewName, err), false
|
||||
}
|
||||
return fmt.Sprintf(" %s %s/%s started\n", style.Bold.Render("✓"), r.Name, crewName), true
|
||||
}
|
||||
|
||||
// discoverAllRigs finds all rigs in the workspace.
|
||||
func discoverAllRigs(townRoot string) ([]*rig.Rig, error) {
|
||||
rigsConfigPath := filepath.Join(townRoot, "mayor", "rigs.json")
|
||||
|
||||
Reference in New Issue
Block a user