From 6f0282f1c66996254c2218f2136fb637bfadbdd4 Mon Sep 17 00:00:00 2001 From: obsidian Date: Sat, 24 Jan 2026 17:13:07 -0800 Subject: [PATCH] perf(rpc): use bd daemon protocol to reduce subprocess overhead Replace bd subprocess calls in gt commands with daemon RPC when available. Each subprocess call has ~40ms overhead for Go binary startup, so using the daemon's Unix socket protocol significantly reduces latency. Changes: - Add RPC client to beads package (beads_rpc.go) - Modify List/Show/Update/Close methods to try RPC first, fall back to subprocess - Replace runBdPrime() with direct content output (avoids bd subprocess) - Replace checkPendingEscalations() to use beads.List() with RPC - Replace hook.go bd subprocess calls with beads package methods The RPC client: - Connects to daemon via Unix socket at .beads/bd.sock - Uses JSON-based request/response protocol (same as bd daemon) - Falls back gracefully to subprocess if daemon unavailable - Lazy-initializes connection on first use Performance improvement targets (from bd-2zd.2): - gt prime < 100ms (was 5.8s with subprocess chain) - gt hook < 100ms (was ~323ms) Closes: bd-2zd.2 --- internal/beads/beads.go | 93 +++++++++- internal/beads/beads_rpc.go | 330 ++++++++++++++++++++++++++++++++++++ internal/cmd/hook.go | 24 +-- internal/cmd/prime.go | 59 ++----- 4 files changed, 440 insertions(+), 66 deletions(-) create mode 100644 internal/beads/beads_rpc.go diff --git a/internal/beads/beads.go b/internal/beads/beads.go index f8e6b420..a7706f3b 100644 --- a/internal/beads/beads.go +++ b/internal/beads/beads.go @@ -119,6 +119,12 @@ type Beads struct { // Populated on first call to getTownRoot() to avoid filesystem walk on every operation. townRoot string searchedRoot bool + + // RPC client for daemon communication (lazy-initialized). + // When available, RPC is preferred over subprocess for performance. + rpcClient *rpcClient + rpcChecked bool + rpcAvailable bool } // New creates a new Beads wrapper for the given directory. @@ -287,7 +293,14 @@ func filterBeadsEnv(environ []string) []string { } // List returns issues matching the given options. +// Uses daemon RPC when available for better performance (~40ms faster). func (b *Beads) List(opts ListOptions) ([]*Issue, error) { + // Try RPC first (faster when daemon is running) + if issues, err := b.listViaRPC(opts); err == nil { + return issues, nil + } + + // Fall back to subprocess args := []string{"list", "--json"} if opts.Status != "" { @@ -400,7 +413,14 @@ func (b *Beads) ReadyWithType(issueType string) ([]*Issue, error) { } // Show returns detailed information about an issue. +// Uses daemon RPC when available for better performance (~40ms faster). func (b *Beads) Show(id string) (*Issue, error) { + // Try RPC first (faster when daemon is running) + if issue, err := b.showViaRPC(id); err == nil { + return issue, nil + } + + // Fall back to subprocess out, err := b.run("show", id, "--json") if err != nil { return nil, err @@ -559,7 +579,14 @@ func (b *Beads) CreateWithID(id string, opts CreateOptions) (*Issue, error) { } // Update updates an existing issue. +// Uses daemon RPC when available for better performance (~40ms faster). func (b *Beads) Update(id string, opts UpdateOptions) error { + // Try RPC first (faster when daemon is running) + if err := b.updateViaRPC(id, opts); err == nil { + return nil + } + + // Fall back to subprocess args := []string{"update", id} if opts.Title != nil { @@ -598,15 +625,26 @@ func (b *Beads) Update(id string, opts UpdateOptions) error { // Close closes one or more issues. // If a runtime session ID is set in the environment, it is passed to bd close // for work attribution tracking (see decision 009-session-events-architecture.md). +// Uses daemon RPC when available for better performance (~40ms faster per call). func (b *Beads) Close(ids ...string) error { if len(ids) == 0 { return nil } + sessionID := runtime.SessionIDFromEnv() + + // Try RPC for single-issue closes (faster when daemon is running) + if len(ids) == 1 { + if err := b.closeViaRPC(ids[0], "", sessionID, false); err == nil { + return nil + } + } + + // Fall back to subprocess args := append([]string{"close"}, ids...) // Pass session ID for work attribution if available - if sessionID := runtime.SessionIDFromEnv(); sessionID != "" { + if sessionID != "" { args = append(args, "--session="+sessionID) } @@ -617,16 +655,51 @@ func (b *Beads) Close(ids ...string) error { // CloseWithReason closes one or more issues with a reason. // If a runtime session ID is set in the environment, it is passed to bd close // for work attribution tracking (see decision 009-session-events-architecture.md). +// Uses daemon RPC when available for better performance (~40ms faster per call). func (b *Beads) CloseWithReason(reason string, ids ...string) error { if len(ids) == 0 { return nil } + sessionID := runtime.SessionIDFromEnv() + + // Try RPC for single-issue closes (faster when daemon is running) + if len(ids) == 1 { + if err := b.closeViaRPC(ids[0], reason, sessionID, false); err == nil { + return nil + } + } + + // Fall back to subprocess args := append([]string{"close"}, ids...) args = append(args, "--reason="+reason) // Pass session ID for work attribution if available - if sessionID := runtime.SessionIDFromEnv(); sessionID != "" { + if sessionID != "" { + args = append(args, "--session="+sessionID) + } + + _, err := b.run(args...) + return err +} + +// CloseForced closes an issue with force flag and optional reason. +// The force flag bypasses blockers and other validation checks. +// Uses daemon RPC when available for better performance (~40ms faster). +func (b *Beads) CloseForced(id, reason string) error { + sessionID := runtime.SessionIDFromEnv() + + // Try RPC first (faster when daemon is running) + if err := b.closeViaRPC(id, reason, sessionID, true); err == nil { + return nil + } + + // Fall back to subprocess + args := []string{"close", id, "--force"} + if reason != "" { + args = append(args, "--reason="+reason) + } + if sessionID != "" { args = append(args, "--session="+sessionID) } @@ -800,3 +873,19 @@ func ProvisionPrimeMDForWorktree(worktreePath string) error { // Provision PRIME.md in the target directory return ProvisionPrimeMD(beadsDir) } + +// GetPrimeContent returns the beads workflow context content. +// It checks for a custom PRIME.md file first, otherwise returns the default. +// This eliminates the need to spawn a bd subprocess for gt prime. +func GetPrimeContent(workDir string) string { + beadsDir := ResolveBeadsDir(workDir) + primePath := filepath.Join(beadsDir, "PRIME.md") + + // Check for custom PRIME.md + if content, err := os.ReadFile(primePath); err == nil { + return strings.TrimSpace(string(content)) + } + + // Return default content + return strings.TrimSpace(primeContent) +} diff --git a/internal/beads/beads_rpc.go b/internal/beads/beads_rpc.go new file mode 100644 index 00000000..fd712a02 --- /dev/null +++ b/internal/beads/beads_rpc.go @@ -0,0 +1,330 @@ +package beads + +import ( + "bufio" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "time" +) + +// MaxUnixSocketPath is the maximum length for Unix socket paths. +const MaxUnixSocketPath = 103 + +// rpcClient represents an RPC client for the bd daemon. +type rpcClient struct { + conn net.Conn + socketPath string + timeout time.Duration + cwd string +} + +// rpcRequest represents an RPC request to the daemon. +type rpcRequest struct { + Operation string `json:"operation"` + Args json.RawMessage `json:"args"` + Cwd string `json:"cwd,omitempty"` +} + +// rpcResponse represents an RPC response from the daemon. +type rpcResponse struct { + Success bool `json:"success"` + Data json.RawMessage `json:"data,omitempty"` + Error string `json:"error,omitempty"` +} + +// tryConnectRPC attempts to connect to the bd daemon. +// Returns nil if no daemon is running. +func tryConnectRPC(workspacePath string) *rpcClient { + socketPath := socketPathForWorkspace(workspacePath) + + // Check if socket exists + if _, err := os.Stat(socketPath); os.IsNotExist(err) { + return nil + } + + conn, err := net.DialTimeout("unix", socketPath, 200*time.Millisecond) + if err != nil { + return nil + } + + client := &rpcClient{ + conn: conn, + socketPath: socketPath, + timeout: 30 * time.Second, + cwd: workspacePath, + } + + // Quick health check + if err := client.ping(); err != nil { + _ = conn.Close() + return nil + } + + return client +} + +// close closes the RPC connection. +func (c *rpcClient) close() error { + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// execute sends a request and returns the response. +func (c *rpcClient) execute(operation string, args interface{}) (*rpcResponse, error) { + argsJSON, err := json.Marshal(args) + if err != nil { + return nil, fmt.Errorf("marshaling args: %w", err) + } + + req := rpcRequest{ + Operation: operation, + Args: argsJSON, + Cwd: c.cwd, + } + + reqJSON, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshaling request: %w", err) + } + + if c.timeout > 0 { + deadline := time.Now().Add(c.timeout) + if err := c.conn.SetDeadline(deadline); err != nil { + return nil, fmt.Errorf("setting deadline: %w", err) + } + } + + writer := bufio.NewWriter(c.conn) + if _, err := writer.Write(reqJSON); err != nil { + return nil, fmt.Errorf("writing request: %w", err) + } + if err := writer.WriteByte('\n'); err != nil { + return nil, fmt.Errorf("writing newline: %w", err) + } + if err := writer.Flush(); err != nil { + return nil, fmt.Errorf("flushing: %w", err) + } + + reader := bufio.NewReader(c.conn) + respLine, err := reader.ReadBytes('\n') + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + + var resp rpcResponse + if err := json.Unmarshal(respLine, &resp); err != nil { + return nil, fmt.Errorf("unmarshaling response: %w", err) + } + + if !resp.Success { + return &resp, fmt.Errorf("operation failed: %s", resp.Error) + } + + return &resp, nil +} + +// ping verifies the daemon is alive. +func (c *rpcClient) ping() error { + _, err := c.execute("ping", nil) + return err +} + +// socketPathForWorkspace returns the socket path for a workspace. +// This mirrors the logic in beads/internal/rpc/socket_path.go. +func socketPathForWorkspace(workspacePath string) string { + // Compute the "natural" socket path in .beads/ + naturalPath := filepath.Join(workspacePath, ".beads", "bd.sock") + + // If natural path is short enough, use it + if len(naturalPath) <= MaxUnixSocketPath { + return naturalPath + } + + // Path too long - use /tmp with hash + hash := sha256.Sum256([]byte(workspacePath)) + hashStr := hex.EncodeToString(hash[:4]) + return filepath.Join("/tmp", "beads-"+hashStr, "bd.sock") +} + +// getRPCClient returns the RPC client, initializing on first call. +// Returns nil if daemon is not available. +func (b *Beads) getRPCClient() *rpcClient { + if b.rpcChecked { + return b.rpcClient + } + + b.rpcChecked = true + + // Don't use RPC in isolated mode (tests) + if b.isolated { + return nil + } + + // Resolve workspace path for socket discovery + workspacePath := b.beadsDir + if workspacePath == "" { + workspacePath = ResolveBeadsDir(b.workDir) + } + + // Get the workspace root (parent of .beads) + if filepath.Base(workspacePath) == ".beads" { + workspacePath = filepath.Dir(workspacePath) + } + + b.rpcClient = tryConnectRPC(workspacePath) + b.rpcAvailable = b.rpcClient != nil + return b.rpcClient +} + +// closeRPC closes the RPC client if connected. +func (b *Beads) closeRPC() { + if b.rpcClient != nil { + _ = b.rpcClient.close() + b.rpcClient = nil + } +} + +// RPC operation argument types + +type rpcListArgs struct { + Status string `json:"status,omitempty"` + Assignee string `json:"assignee,omitempty"` + Labels []string `json:"labels,omitempty"` + LabelsAny []string `json:"labels_any,omitempty"` + ExcludeStatus []string `json:"exclude_status,omitempty"` + Priority *int `json:"priority,omitempty"` + ParentID string `json:"parent_id,omitempty"` + NoAssignee bool `json:"no_assignee,omitempty"` + Limit int `json:"limit,omitempty"` +} + +type rpcShowArgs struct { + ID string `json:"id"` +} + +type rpcUpdateArgs struct { + ID string `json:"id"` + Title *string `json:"title,omitempty"` + Status *string `json:"status,omitempty"` + Priority *int `json:"priority,omitempty"` + Description *string `json:"description,omitempty"` + Assignee *string `json:"assignee,omitempty"` + AddLabels []string `json:"add_labels,omitempty"` + RemoveLabels []string `json:"remove_labels,omitempty"` + SetLabels []string `json:"set_labels,omitempty"` +} + +type rpcCloseArgs struct { + ID string `json:"id"` + Reason string `json:"reason,omitempty"` + Session string `json:"session,omitempty"` + Force bool `json:"force,omitempty"` +} + +// listViaRPC performs a list operation via the daemon RPC. +func (b *Beads) listViaRPC(opts ListOptions) ([]*Issue, error) { + client := b.getRPCClient() + if client == nil { + return nil, fmt.Errorf("no RPC client") + } + + args := rpcListArgs{ + Status: opts.Status, + Assignee: opts.Assignee, + ParentID: opts.Parent, + } + + // Convert Label to Labels array if set + if opts.Label != "" { + args.Labels = []string{opts.Label} + } + + // Handle priority: -1 means no filter + if opts.Priority >= 0 { + args.Priority = &opts.Priority + } + + if opts.NoAssignee { + args.NoAssignee = true + } + + resp, err := client.execute("list", args) + if err != nil { + return nil, err + } + + var issues []*Issue + if err := json.Unmarshal(resp.Data, &issues); err != nil { + return nil, fmt.Errorf("unmarshaling issues: %w", err) + } + + return issues, nil +} + +// showViaRPC performs a show operation via the daemon RPC. +func (b *Beads) showViaRPC(id string) (*Issue, error) { + client := b.getRPCClient() + if client == nil { + return nil, fmt.Errorf("no RPC client") + } + + resp, err := client.execute("show", rpcShowArgs{ID: id}) + if err != nil { + return nil, err + } + + var issue Issue + if err := json.Unmarshal(resp.Data, &issue); err != nil { + return nil, fmt.Errorf("unmarshaling issue: %w", err) + } + + return &issue, nil +} + +// updateViaRPC performs an update operation via the daemon RPC. +func (b *Beads) updateViaRPC(id string, opts UpdateOptions) error { + client := b.getRPCClient() + if client == nil { + return fmt.Errorf("no RPC client") + } + + args := rpcUpdateArgs{ + ID: id, + Title: opts.Title, + Status: opts.Status, + Priority: opts.Priority, + Description: opts.Description, + Assignee: opts.Assignee, + AddLabels: opts.AddLabels, + RemoveLabels: opts.RemoveLabels, + SetLabels: opts.SetLabels, + } + + _, err := client.execute("update", args) + return err +} + +// closeViaRPC performs a close operation via the daemon RPC. +func (b *Beads) closeViaRPC(id, reason, session string, force bool) error { + client := b.getRPCClient() + if client == nil { + return fmt.Errorf("no RPC client") + } + + args := rpcCloseArgs{ + ID: id, + Reason: reason, + Session: session, + Force: force, + } + + _, err := client.execute("close", args) + return err +} diff --git a/internal/cmd/hook.go b/internal/cmd/hook.go index 0d955b82..7da9191c 100644 --- a/internal/cmd/hook.go +++ b/internal/cmd/hook.go @@ -11,7 +11,6 @@ import ( "github.com/spf13/cobra" "github.com/steveyegge/gastown/internal/beads" "github.com/steveyegge/gastown/internal/events" - "github.com/steveyegge/gastown/internal/runtime" "github.com/steveyegge/gastown/internal/style" "github.com/steveyegge/gastown/internal/workspace" ) @@ -185,15 +184,8 @@ func runHook(_ *cobra.Command, args []string) error { fmt.Printf("%s Replacing completed bead %s...\n", style.Dim.Render("ℹ"), existing.ID) if !hookDryRun { if hasAttachment { - // Close completed molecule bead (use bd close --force for pinned) - closeArgs := []string{"close", existing.ID, "--force", - "--reason=Auto-replaced by gt hook (molecule complete)"} - if sessionID := runtime.SessionIDFromEnv(); sessionID != "" { - closeArgs = append(closeArgs, "--session="+sessionID) - } - closeCmd := exec.Command("bd", closeArgs...) - closeCmd.Stderr = os.Stderr - if err := closeCmd.Run(); err != nil { + // Close completed molecule bead (use force for pinned) + if err := b.CloseForced(existing.ID, "Auto-replaced by gt hook (molecule complete)"); err != nil { return fmt.Errorf("closing completed bead %s: %w", existing.ID, err) } } else { @@ -234,15 +226,9 @@ func runHook(_ *cobra.Command, args []string) error { return nil } - // Hook the bead using bd update (discovery-based approach) - // Run from town root so bd can find routes.jsonl for prefix-based routing. - // This is essential for hooking convoys (hq-* prefix) stored in town beads. - hookCmd := exec.Command("bd", "update", beadID, "--status=hooked", "--assignee="+agentID) - if townRoot, err := workspace.FindFromCwd(); err == nil { - hookCmd.Dir = townRoot - } - hookCmd.Stderr = os.Stderr - if err := hookCmd.Run(); err != nil { + // Hook the bead using beads package (uses RPC when daemon available) + status := beads.StatusHooked + if err := b.Update(beadID, beads.UpdateOptions{Status: &status, Assignee: &agentID}); err != nil { return fmt.Errorf("hooking bead: %w", err) } diff --git a/internal/cmd/prime.go b/internal/cmd/prime.go index a66beb63..400e9a21 100644 --- a/internal/cmd/prime.go +++ b/internal/cmd/prime.go @@ -2,7 +2,6 @@ package cmd import ( "bytes" - "encoding/json" "errors" "fmt" "os" @@ -340,29 +339,13 @@ func detectRole(cwd, townRoot string) RoleInfo { return ctx } -// runBdPrime runs `bd prime` and outputs the result. -// This provides beads workflow context to the agent. +// runBdPrime outputs beads workflow context directly. +// This replaces the bd subprocess call to eliminate ~40ms startup overhead. func runBdPrime(workDir string) { - cmd := exec.Command("bd", "prime") - cmd.Dir = workDir - - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - if err := cmd.Run(); err != nil { - // Skip if bd prime fails (beads might not be available) - // But log stderr if present for debugging - if errMsg := strings.TrimSpace(stderr.String()); errMsg != "" { - fmt.Fprintf(os.Stderr, "bd prime: %s\n", errMsg) - } - return - } - - output := strings.TrimSpace(stdout.String()) - if output != "" { + content := beads.GetPrimeContent(workDir) + if content != "" { fmt.Println() - fmt.Println(output) + fmt.Println(content) } } @@ -706,34 +689,20 @@ func ensureBeadsRedirect(ctx RoleContext) { // checkPendingEscalations queries for open escalation beads and displays them prominently. // This is called on Mayor startup to surface issues needing human attention. +// Uses beads package which leverages RPC when daemon is available. func checkPendingEscalations(ctx RoleContext) { - // Query for open escalations using bd list with tag filter - cmd := exec.Command("bd", "list", "--status=open", "--tag=escalation", "--json") - cmd.Dir = ctx.WorkDir - - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - if err := cmd.Run(); err != nil { + // Query for open escalations using beads package (uses RPC when available) + b := beads.New(ctx.WorkDir) + escalations, err := b.List(beads.ListOptions{ + Status: "open", + Label: "escalation", + Priority: -1, + }) + if err != nil || len(escalations) == 0 { // Silently skip - escalation check is best-effort return } - // Parse JSON output - var escalations []struct { - ID string `json:"id"` - Title string `json:"title"` - Priority int `json:"priority"` - Description string `json:"description"` - Created string `json:"created"` - } - - if err := json.Unmarshal(stdout.Bytes(), &escalations); err != nil || len(escalations) == 0 { - // No escalations or parse error - return - } - // Count by severity critical := 0 high := 0