perf: Batch and parallelize convoy worker lookups (gt-a40d8)
Optimize getWorkersForIssues() from O(N×R) to O(R) subprocess calls: - Batch sqlite queries per rig using WHERE hook_bead IN (...) - Parallelize rig lookups with goroutines Expected improvement: 300-600ms → 50-100ms for moderate convoys 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
tea "github.com/charmbracelet/bubbletea"
|
||||
@@ -1212,64 +1213,98 @@ type workerInfo struct {
|
||||
|
||||
// getWorkersForIssues finds workers currently assigned to the given issues.
|
||||
// Returns a map from issue ID to worker info.
|
||||
//
|
||||
// Optimized to batch queries per rig (O(R) instead of O(N×R)) and
|
||||
// parallelize across rigs.
|
||||
func getWorkersForIssues(issueIDs []string) map[string]*workerInfo {
|
||||
result := make(map[string]*workerInfo)
|
||||
if len(issueIDs) == 0 {
|
||||
return result
|
||||
}
|
||||
|
||||
// Query agent beads where hook_bead matches one of our issues
|
||||
// We need to check beads across all rigs, so query each potential rig
|
||||
|
||||
// Find town root
|
||||
townRoot, err := workspace.FindFromCwd()
|
||||
if err != nil || townRoot == "" {
|
||||
return result
|
||||
}
|
||||
|
||||
// Discover rigs
|
||||
// Discover rigs with beads databases
|
||||
rigDirs, _ := filepath.Glob(filepath.Join(townRoot, "*", "polecats"))
|
||||
var beadsDBS []string
|
||||
for _, polecatsDir := range rigDirs {
|
||||
rigDir := filepath.Dir(polecatsDir)
|
||||
beadsDB := filepath.Join(rigDir, "mayor", "rig", ".beads", "beads.db")
|
||||
|
||||
// Check if beads.db exists
|
||||
if _, err := os.Stat(beadsDB); err != nil {
|
||||
continue
|
||||
if _, err := os.Stat(beadsDB); err == nil {
|
||||
beadsDBS = append(beadsDBS, beadsDB)
|
||||
}
|
||||
}
|
||||
|
||||
// Query for agent beads with matching hook_bead
|
||||
for _, issueID := range issueIDs {
|
||||
if _, ok := result[issueID]; ok {
|
||||
continue // Already found a worker for this issue
|
||||
}
|
||||
if len(beadsDBS) == 0 {
|
||||
return result
|
||||
}
|
||||
|
||||
// Query for agent bead with this hook_bead
|
||||
safeID := strings.ReplaceAll(issueID, "'", "''")
|
||||
query := fmt.Sprintf(
|
||||
`SELECT id, hook_bead, last_activity FROM issues WHERE issue_type = 'agent' AND status = 'open' AND hook_bead = '%s' LIMIT 1`,
|
||||
safeID)
|
||||
// Build the IN clause with properly escaped issue IDs
|
||||
var quotedIDs []string
|
||||
for _, id := range issueIDs {
|
||||
safeID := strings.ReplaceAll(id, "'", "''")
|
||||
quotedIDs = append(quotedIDs, fmt.Sprintf("'%s'", safeID))
|
||||
}
|
||||
inClause := strings.Join(quotedIDs, ", ")
|
||||
|
||||
queryCmd := exec.Command("sqlite3", "-json", beadsDB, query)
|
||||
// Batch query: fetch all matching agents in one query per rig
|
||||
query := fmt.Sprintf(
|
||||
`SELECT id, hook_bead, last_activity FROM issues WHERE issue_type = 'agent' AND status = 'open' AND hook_bead IN (%s)`,
|
||||
inClause)
|
||||
|
||||
// Query all rigs in parallel
|
||||
type rigResult struct {
|
||||
agents []struct {
|
||||
ID string `json:"id"`
|
||||
HookBead string `json:"hook_bead"`
|
||||
LastActivity string `json:"last_activity"`
|
||||
}
|
||||
}
|
||||
|
||||
resultChan := make(chan rigResult, len(beadsDBS))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, beadsDB := range beadsDBS {
|
||||
wg.Add(1)
|
||||
go func(db string) {
|
||||
defer wg.Done()
|
||||
|
||||
queryCmd := exec.Command("sqlite3", "-json", db, query)
|
||||
var stdout bytes.Buffer
|
||||
queryCmd.Stdout = &stdout
|
||||
if err := queryCmd.Run(); err != nil {
|
||||
continue
|
||||
resultChan <- rigResult{}
|
||||
return
|
||||
}
|
||||
|
||||
var agents []struct {
|
||||
ID string `json:"id"`
|
||||
HookBead string `json:"hook_bead"`
|
||||
LastActivity string `json:"last_activity"`
|
||||
var rr rigResult
|
||||
if err := json.Unmarshal(stdout.Bytes(), &rr.agents); err != nil {
|
||||
resultChan <- rigResult{}
|
||||
return
|
||||
}
|
||||
if err := json.Unmarshal(stdout.Bytes(), &agents); err != nil || len(agents) == 0 {
|
||||
resultChan <- rr
|
||||
}(beadsDB)
|
||||
}
|
||||
|
||||
// Wait for all queries to complete
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(resultChan)
|
||||
}()
|
||||
|
||||
// Collect results from all rigs
|
||||
for rr := range resultChan {
|
||||
for _, agent := range rr.agents {
|
||||
// Skip if we already found a worker for this issue
|
||||
if _, ok := result[agent.HookBead]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
agent := agents[0]
|
||||
|
||||
// Parse agent ID to get worker identity
|
||||
// Format: gt-<rig>-<role>-<name> or gt-<rig>-<name>
|
||||
workerID := parseWorkerFromAgentBead(agent.ID)
|
||||
if workerID == "" {
|
||||
continue
|
||||
@@ -1283,7 +1318,7 @@ func getWorkersForIssues(issueIDs []string) map[string]*workerInfo {
|
||||
}
|
||||
}
|
||||
|
||||
result[issueID] = &workerInfo{
|
||||
result[agent.HookBead] = &workerInfo{
|
||||
Worker: workerID,
|
||||
Age: age,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user