perf(up): parallelize all agent startup with concurrency limit (#476)

* perf(up): parallelize agent startup with worker pool and channel-based collection

- Run daemon, deacon, mayor, and rig prefetch all in parallel (4-way concurrent init)
- Use fixed worker pool instead of goroutine-per-task for bounded concurrency
- Replace mutex-protected maps with channel-based result collection (zero lock contention)
- Pre-allocate maps with known capacity to reduce allocations
- Use string concatenation instead of fmt.Sprintf for display names
- Reduce `gt up` startup time from ~50s to ~10s for towns with multiple rigs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(lint): fix errcheck and misspell issues in orphans.go

- Check error return from fmt.Scanln calls
- Fix "Cancelled" -> "Canceled" spelling

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Keith Wyatt
2026-01-13 13:46:50 -08:00
committed by GitHub
parent 69110309cc
commit 7d8d96f7f9
2 changed files with 498 additions and 75 deletions

View File

@@ -6,6 +6,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/spf13/cobra"
@@ -18,12 +19,23 @@ import (
"github.com/steveyegge/gastown/internal/mayor"
"github.com/steveyegge/gastown/internal/polecat"
"github.com/steveyegge/gastown/internal/refinery"
"github.com/steveyegge/gastown/internal/rig"
"github.com/steveyegge/gastown/internal/style"
"github.com/steveyegge/gastown/internal/tmux"
"github.com/steveyegge/gastown/internal/witness"
"github.com/steveyegge/gastown/internal/workspace"
)
// agentStartResult holds the result of starting an agent.
type agentStartResult struct {
name string // Display name like "Witness (gastown)"
ok bool // Whether start succeeded
detail string // Status detail (session name or error)
}
// maxConcurrentAgentStarts limits parallel agent startups to avoid resource exhaustion.
const maxConcurrentAgentStarts = 10
var upCmd = &cobra.Command{
Use: "up",
GroupID: GroupServices,
@@ -70,89 +82,108 @@ func runUp(cmd *cobra.Command, args []string) error {
allOK := true
// 1. Daemon (Go process)
if err := ensureDaemon(townRoot); err != nil {
printStatus("Daemon", false, err.Error())
allOK = false
} else {
running, pid, _ := daemon.IsRunning(townRoot)
if running {
printStatus("Daemon", true, fmt.Sprintf("PID %d", pid))
}
}
// 2. Deacon (Claude agent)
deaconMgr := deacon.NewManager(townRoot)
if err := deaconMgr.Start(""); err != nil {
if err == deacon.ErrAlreadyRunning {
printStatus("Deacon", true, deaconMgr.SessionName())
} else {
printStatus("Deacon", false, err.Error())
allOK = false
}
} else {
printStatus("Deacon", true, deaconMgr.SessionName())
}
// 3. Mayor (Claude agent)
mayorMgr := mayor.NewManager(townRoot)
if err := mayorMgr.Start(""); err != nil {
if err == mayor.ErrAlreadyRunning {
printStatus("Mayor", true, mayorMgr.SessionName())
} else {
printStatus("Mayor", false, err.Error())
allOK = false
}
} else {
printStatus("Mayor", true, mayorMgr.SessionName())
}
// 4. Witnesses (one per rig)
// Discover rigs early so we can prefetch while daemon/deacon/mayor start
rigs := discoverRigs(townRoot)
for _, rigName := range rigs {
_, r, err := getRig(rigName)
if err != nil {
printStatus(fmt.Sprintf("Witness (%s)", rigName), false, err.Error())
allOK = false
continue
}
mgr := witness.NewManager(r)
if err := mgr.Start(false, "", nil); err != nil {
if err == witness.ErrAlreadyRunning {
printStatus(fmt.Sprintf("Witness (%s)", rigName), true, mgr.SessionName())
// Start daemon, deacon, mayor, and rig prefetch in parallel
var daemonErr error
var daemonPID int
var deaconResult, mayorResult agentStartResult
var prefetchedRigs map[string]*rig.Rig
var rigErrors map[string]error
var startupWg sync.WaitGroup
startupWg.Add(4)
// 1. Daemon (Go process)
go func() {
defer startupWg.Done()
if err := ensureDaemon(townRoot); err != nil {
daemonErr = err
} else {
running, pid, _ := daemon.IsRunning(townRoot)
if running {
daemonPID = pid
}
}
}()
// 2. Deacon
go func() {
defer startupWg.Done()
deaconMgr := deacon.NewManager(townRoot)
if err := deaconMgr.Start(""); err != nil {
if err == deacon.ErrAlreadyRunning {
deaconResult = agentStartResult{name: "Deacon", ok: true, detail: deaconMgr.SessionName()}
} else {
printStatus(fmt.Sprintf("Witness (%s)", rigName), false, err.Error())
allOK = false
deaconResult = agentStartResult{name: "Deacon", ok: false, detail: err.Error()}
}
} else {
printStatus(fmt.Sprintf("Witness (%s)", rigName), true, mgr.SessionName())
deaconResult = agentStartResult{name: "Deacon", ok: true, detail: deaconMgr.SessionName()}
}
}()
// 3. Mayor
go func() {
defer startupWg.Done()
mayorMgr := mayor.NewManager(townRoot)
if err := mayorMgr.Start(""); err != nil {
if err == mayor.ErrAlreadyRunning {
mayorResult = agentStartResult{name: "Mayor", ok: true, detail: mayorMgr.SessionName()}
} else {
mayorResult = agentStartResult{name: "Mayor", ok: false, detail: err.Error()}
}
} else {
mayorResult = agentStartResult{name: "Mayor", ok: true, detail: mayorMgr.SessionName()}
}
}()
// 4. Prefetch rig configs (overlaps with daemon/deacon/mayor startup)
go func() {
defer startupWg.Done()
prefetchedRigs, rigErrors = prefetchRigs(rigs)
}()
startupWg.Wait()
// Print daemon/deacon/mayor results
if daemonErr != nil {
printStatus("Daemon", false, daemonErr.Error())
allOK = false
} else if daemonPID > 0 {
printStatus("Daemon", true, fmt.Sprintf("PID %d", daemonPID))
}
printStatus(deaconResult.name, deaconResult.ok, deaconResult.detail)
if !deaconResult.ok {
allOK = false
}
printStatus(mayorResult.name, mayorResult.ok, mayorResult.detail)
if !mayorResult.ok {
allOK = false
}
// 5 & 6. Witnesses and Refineries (using prefetched rigs)
witnessResults, refineryResults := startRigAgentsWithPrefetch(rigs, prefetchedRigs, rigErrors)
// Print results in order: all witnesses first, then all refineries
for _, rigName := range rigs {
if result, ok := witnessResults[rigName]; ok {
printStatus(result.name, result.ok, result.detail)
if !result.ok {
allOK = false
}
}
}
for _, rigName := range rigs {
if result, ok := refineryResults[rigName]; ok {
printStatus(result.name, result.ok, result.detail)
if !result.ok {
allOK = false
}
}
}
// 5. Refineries (one per rig)
for _, rigName := range rigs {
_, r, err := getRig(rigName)
if err != nil {
printStatus(fmt.Sprintf("Refinery (%s)", rigName), false, err.Error())
allOK = false
continue
}
mgr := refinery.NewManager(r)
if err := mgr.Start(false, ""); err != nil {
if err == refinery.ErrAlreadyRunning {
printStatus(fmt.Sprintf("Refinery (%s)", rigName), true, mgr.SessionName())
} else {
printStatus(fmt.Sprintf("Refinery (%s)", rigName), false, err.Error())
allOK = false
}
} else {
printStatus(fmt.Sprintf("Refinery (%s)", rigName), true, mgr.SessionName())
}
}
// 6. Crew (if --restore)
// 7. Crew (if --restore)
if upRestore {
for _, rigName := range rigs {
crewStarted, crewErrors := startCrewFromSettings(townRoot, rigName)
@@ -249,6 +280,182 @@ func ensureDaemon(townRoot string) error {
return nil
}
// rigPrefetchResult holds the result of loading a single rig config.
type rigPrefetchResult struct {
index int
rig *rig.Rig
err error
}
// prefetchRigs loads all rig configs in parallel for faster agent startup.
// Returns a map of rig name to loaded Rig, and any errors encountered.
func prefetchRigs(rigNames []string) (map[string]*rig.Rig, map[string]error) {
n := len(rigNames)
if n == 0 {
return make(map[string]*rig.Rig), make(map[string]error)
}
// Use channel to collect results without locking
results := make(chan rigPrefetchResult, n)
for i, name := range rigNames {
go func(idx int, rigName string) {
_, r, err := getRig(rigName)
results <- rigPrefetchResult{index: idx, rig: r, err: err}
}(i, name)
}
// Collect results - pre-allocate maps with capacity
rigs := make(map[string]*rig.Rig, n)
errors := make(map[string]error)
for i := 0; i < n; i++ {
res := <-results
name := rigNames[res.index]
if res.err != nil {
errors[name] = res.err
} else {
rigs[name] = res.rig
}
}
return rigs, errors
}
// agentTask represents a unit of work for the agent worker pool.
type agentTask struct {
rigName string
rigObj *rig.Rig
isWitness bool // true for witness, false for refinery
}
// agentResultMsg carries result back from worker to collector.
type agentResultMsg struct {
rigName string
isWitness bool
result agentStartResult
}
// startRigAgentsParallel starts all Witnesses and Refineries concurrently.
// Discovers and prefetches rigs internally. For use when rigs aren't pre-loaded.
func startRigAgentsParallel(rigNames []string) (witnessResults, refineryResults map[string]agentStartResult) {
prefetchedRigs, rigErrors := prefetchRigs(rigNames)
return startRigAgentsWithPrefetch(rigNames, prefetchedRigs, rigErrors)
}
// startRigAgentsWithPrefetch starts all Witnesses and Refineries using pre-loaded rig configs.
// Uses a worker pool with fixed goroutine count to limit concurrency and reduce overhead.
func startRigAgentsWithPrefetch(rigNames []string, prefetchedRigs map[string]*rig.Rig, rigErrors map[string]error) (witnessResults, refineryResults map[string]agentStartResult) {
n := len(rigNames)
witnessResults = make(map[string]agentStartResult, n)
refineryResults = make(map[string]agentStartResult, n)
if n == 0 {
return
}
// Record errors for rigs that failed to load
for rigName, err := range rigErrors {
errDetail := err.Error()
witnessResults[rigName] = agentStartResult{
name: "Witness (" + rigName + ")",
ok: false,
detail: errDetail,
}
refineryResults[rigName] = agentStartResult{
name: "Refinery (" + rigName + ")",
ok: false,
detail: errDetail,
}
}
numTasks := len(prefetchedRigs) * 2 // witness + refinery per rig
if numTasks == 0 {
return
}
// Task channel and result channel
tasks := make(chan agentTask, numTasks)
results := make(chan agentResultMsg, numTasks)
// Start fixed worker pool (bounded by maxConcurrentAgentStarts)
numWorkers := maxConcurrentAgentStarts
if numTasks < numWorkers {
numWorkers = numTasks
}
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range tasks {
var result agentStartResult
if task.isWitness {
result = upStartWitness(task.rigName, task.rigObj)
} else {
result = upStartRefinery(task.rigName, task.rigObj)
}
results <- agentResultMsg{
rigName: task.rigName,
isWitness: task.isWitness,
result: result,
}
}
}()
}
// Enqueue all tasks
for rigName, r := range prefetchedRigs {
tasks <- agentTask{rigName: rigName, rigObj: r, isWitness: true}
tasks <- agentTask{rigName: rigName, rigObj: r, isWitness: false}
}
close(tasks)
// Close results channel when workers are done
go func() {
wg.Wait()
close(results)
}()
// Collect results - no locking needed, single goroutine collects
for msg := range results {
if msg.isWitness {
witnessResults[msg.rigName] = msg.result
} else {
refineryResults[msg.rigName] = msg.result
}
}
return
}
// upStartWitness starts a witness for the given rig and returns a result struct.
func upStartWitness(rigName string, r *rig.Rig) agentStartResult {
name := "Witness (" + rigName + ")"
mgr := witness.NewManager(r)
if err := mgr.Start(false, "", nil); err != nil {
if err == witness.ErrAlreadyRunning {
return agentStartResult{name: name, ok: true, detail: mgr.SessionName()}
}
return agentStartResult{name: name, ok: false, detail: err.Error()}
}
return agentStartResult{name: name, ok: true, detail: mgr.SessionName()}
}
// upStartRefinery starts a refinery for the given rig and returns a result struct.
func upStartRefinery(rigName string, r *rig.Rig) agentStartResult {
name := "Refinery (" + rigName + ")"
mgr := refinery.NewManager(r)
if err := mgr.Start(false, ""); err != nil {
if err == refinery.ErrAlreadyRunning {
return agentStartResult{name: name, ok: true, detail: mgr.SessionName()}
}
return agentStartResult{name: name, ok: false, detail: err.Error()}
}
return agentStartResult{name: name, ok: true, detail: mgr.SessionName()}
}
// discoverRigs finds all rigs in the town.
func discoverRigs(townRoot string) []string {
var rigs []string

216
internal/cmd/up_test.go Normal file
View File

@@ -0,0 +1,216 @@
package cmd
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/steveyegge/gastown/internal/rig"
)
func TestAgentStartResult_Fields(t *testing.T) {
result := agentStartResult{
name: "Witness (gastown)",
ok: true,
detail: "gt-gastown-witness",
}
if result.name != "Witness (gastown)" {
t.Errorf("name = %q, want %q", result.name, "Witness (gastown)")
}
if !result.ok {
t.Error("ok should be true")
}
if result.detail != "gt-gastown-witness" {
t.Errorf("detail = %q, want %q", result.detail, "gt-gastown-witness")
}
}
func TestMaxConcurrentAgentStarts_Constant(t *testing.T) {
// Verify the constant is set to a reasonable value
if maxConcurrentAgentStarts < 1 {
t.Errorf("maxConcurrentAgentStarts = %d, should be >= 1", maxConcurrentAgentStarts)
}
if maxConcurrentAgentStarts > 100 {
t.Errorf("maxConcurrentAgentStarts = %d, should be <= 100 to prevent resource exhaustion", maxConcurrentAgentStarts)
}
}
func TestSemaphoreLimitsConcurrency(t *testing.T) {
// Test that a semaphore pattern properly limits concurrency
const maxConcurrent = 3
const totalTasks = 10
sem := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
var maxObserved int32
var current int32
for i := 0; i < totalTasks; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Acquire semaphore
sem <- struct{}{}
defer func() { <-sem }()
// Track concurrent count
cur := atomic.AddInt32(&current, 1)
defer atomic.AddInt32(&current, -1)
// Update max observed
for {
max := atomic.LoadInt32(&maxObserved)
if cur <= max || atomic.CompareAndSwapInt32(&maxObserved, max, cur) {
break
}
}
// Simulate work
time.Sleep(10 * time.Millisecond)
}()
}
wg.Wait()
if maxObserved > maxConcurrent {
t.Errorf("max concurrent = %d, should not exceed %d", maxObserved, maxConcurrent)
}
}
func TestStartRigAgentsParallel_EmptyRigs(t *testing.T) {
// Test with empty rig list - should return empty maps without error
witnessResults, refineryResults := startRigAgentsParallel([]string{})
if len(witnessResults) != 0 {
t.Errorf("witnessResults should be empty, got %d entries", len(witnessResults))
}
if len(refineryResults) != 0 {
t.Errorf("refineryResults should be empty, got %d entries", len(refineryResults))
}
}
func TestStartRigAgentsWithPrefetch_EmptyRigs(t *testing.T) {
// Test with empty inputs
witnessResults, refineryResults := startRigAgentsWithPrefetch(
[]string{},
make(map[string]*rig.Rig),
make(map[string]error),
)
if len(witnessResults) != 0 {
t.Errorf("witnessResults should be empty, got %d entries", len(witnessResults))
}
if len(refineryResults) != 0 {
t.Errorf("refineryResults should be empty, got %d entries", len(refineryResults))
}
}
func TestStartRigAgentsWithPrefetch_RecordsErrors(t *testing.T) {
// Test that rig errors are properly recorded
rigErrors := map[string]error{
"badrig": fmt.Errorf("rig not found"),
}
witnessResults, refineryResults := startRigAgentsWithPrefetch(
[]string{"badrig"},
make(map[string]*rig.Rig),
rigErrors,
)
if len(witnessResults) != 1 {
t.Errorf("witnessResults should have 1 entry, got %d", len(witnessResults))
}
if result, ok := witnessResults["badrig"]; !ok {
t.Error("witnessResults should have badrig entry")
} else if result.ok {
t.Error("badrig witness result should not be ok")
}
if len(refineryResults) != 1 {
t.Errorf("refineryResults should have 1 entry, got %d", len(refineryResults))
}
if result, ok := refineryResults["badrig"]; !ok {
t.Error("refineryResults should have badrig entry")
} else if result.ok {
t.Error("badrig refinery result should not be ok")
}
}
func TestPrefetchRigs_Empty(t *testing.T) {
// Test with empty rig list
rigs, errors := prefetchRigs([]string{})
if len(rigs) != 0 {
t.Errorf("rigs should be empty, got %d entries", len(rigs))
}
if len(errors) != 0 {
t.Errorf("errors should be empty, got %d entries", len(errors))
}
}
func TestWorkerPoolLimitsConcurrency(t *testing.T) {
// Test that a worker pool pattern properly limits concurrency
const numWorkers = 3
const numTasks = 15
tasks := make(chan int, numTasks)
results := make(chan int, numTasks)
var maxObserved int32
var current int32
// Start worker pool
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for range tasks {
// Track concurrent count
cur := atomic.AddInt32(&current, 1)
// Update max observed
for {
max := atomic.LoadInt32(&maxObserved)
if cur <= max || atomic.CompareAndSwapInt32(&maxObserved, max, cur) {
break
}
}
// Simulate work
time.Sleep(5 * time.Millisecond)
atomic.AddInt32(&current, -1)
results <- 1
}
}()
}
// Enqueue tasks
for i := 0; i < numTasks; i++ {
tasks <- i
}
close(tasks)
// Wait for workers and collect results
go func() {
wg.Wait()
close(results)
}()
count := 0
for range results {
count++
}
if count != numTasks {
t.Errorf("expected %d results, got %d", numTasks, count)
}
if maxObserved > numWorkers {
t.Errorf("max concurrent = %d, should not exceed %d workers", maxObserved, numWorkers)
}
}