perf(rpc): use bd daemon protocol to reduce subprocess overhead

Replace bd subprocess calls in gt commands with daemon RPC when available.
Each subprocess call has ~40ms overhead for Go binary startup, so using
the daemon's Unix socket protocol significantly reduces latency.

Changes:
- Add RPC client to beads package (beads_rpc.go)
- Modify List/Show/Update/Close methods to try RPC first, fall back to subprocess
- Replace runBdPrime() with direct content output (avoids bd subprocess)
- Replace checkPendingEscalations() to use beads.List() with RPC
- Replace hook.go bd subprocess calls with beads package methods

The RPC client:
- Connects to daemon via Unix socket at .beads/bd.sock
- Uses JSON-based request/response protocol (same as bd daemon)
- Falls back gracefully to subprocess if daemon unavailable
- Lazy-initializes connection on first use

Performance improvement targets (from bd-2zd.2):
- gt prime < 100ms (was 5.8s with subprocess chain)
- gt hook < 100ms (was ~323ms)

Closes: bd-2zd.2
This commit is contained in:
obsidian
2026-01-24 17:13:07 -08:00
committed by John Ogle
parent c4b74ee7bf
commit 55048333c6
4 changed files with 440 additions and 66 deletions

View File

@@ -119,6 +119,12 @@ type Beads struct {
// Populated on first call to getTownRoot() to avoid filesystem walk on every operation.
townRoot string
searchedRoot bool
// RPC client for daemon communication (lazy-initialized).
// When available, RPC is preferred over subprocess for performance.
rpcClient *rpcClient
rpcChecked bool
rpcAvailable bool
}
// New creates a new Beads wrapper for the given directory.
@@ -287,7 +293,14 @@ func filterBeadsEnv(environ []string) []string {
}
// List returns issues matching the given options.
// Uses daemon RPC when available for better performance (~40ms faster).
func (b *Beads) List(opts ListOptions) ([]*Issue, error) {
// Try RPC first (faster when daemon is running)
if issues, err := b.listViaRPC(opts); err == nil {
return issues, nil
}
// Fall back to subprocess
args := []string{"list", "--json"}
if opts.Status != "" {
@@ -400,7 +413,14 @@ func (b *Beads) ReadyWithType(issueType string) ([]*Issue, error) {
}
// Show returns detailed information about an issue.
// Uses daemon RPC when available for better performance (~40ms faster).
func (b *Beads) Show(id string) (*Issue, error) {
// Try RPC first (faster when daemon is running)
if issue, err := b.showViaRPC(id); err == nil {
return issue, nil
}
// Fall back to subprocess
out, err := b.run("show", id, "--json")
if err != nil {
return nil, err
@@ -559,7 +579,14 @@ func (b *Beads) CreateWithID(id string, opts CreateOptions) (*Issue, error) {
}
// Update updates an existing issue.
// Uses daemon RPC when available for better performance (~40ms faster).
func (b *Beads) Update(id string, opts UpdateOptions) error {
// Try RPC first (faster when daemon is running)
if err := b.updateViaRPC(id, opts); err == nil {
return nil
}
// Fall back to subprocess
args := []string{"update", id}
if opts.Title != nil {
@@ -598,15 +625,26 @@ func (b *Beads) Update(id string, opts UpdateOptions) error {
// Close closes one or more issues.
// If a runtime session ID is set in the environment, it is passed to bd close
// for work attribution tracking (see decision 009-session-events-architecture.md).
// Uses daemon RPC when available for better performance (~40ms faster per call).
func (b *Beads) Close(ids ...string) error {
if len(ids) == 0 {
return nil
}
sessionID := runtime.SessionIDFromEnv()
// Try RPC for single-issue closes (faster when daemon is running)
if len(ids) == 1 {
if err := b.closeViaRPC(ids[0], "", sessionID, false); err == nil {
return nil
}
}
// Fall back to subprocess
args := append([]string{"close"}, ids...)
// Pass session ID for work attribution if available
if sessionID := runtime.SessionIDFromEnv(); sessionID != "" {
if sessionID != "" {
args = append(args, "--session="+sessionID)
}
@@ -617,16 +655,51 @@ func (b *Beads) Close(ids ...string) error {
// CloseWithReason closes one or more issues with a reason.
// If a runtime session ID is set in the environment, it is passed to bd close
// for work attribution tracking (see decision 009-session-events-architecture.md).
// Uses daemon RPC when available for better performance (~40ms faster per call).
func (b *Beads) CloseWithReason(reason string, ids ...string) error {
if len(ids) == 0 {
return nil
}
sessionID := runtime.SessionIDFromEnv()
// Try RPC for single-issue closes (faster when daemon is running)
if len(ids) == 1 {
if err := b.closeViaRPC(ids[0], reason, sessionID, false); err == nil {
return nil
}
}
// Fall back to subprocess
args := append([]string{"close"}, ids...)
args = append(args, "--reason="+reason)
// Pass session ID for work attribution if available
if sessionID := runtime.SessionIDFromEnv(); sessionID != "" {
if sessionID != "" {
args = append(args, "--session="+sessionID)
}
_, err := b.run(args...)
return err
}
// CloseForced closes an issue with force flag and optional reason.
// The force flag bypasses blockers and other validation checks.
// Uses daemon RPC when available for better performance (~40ms faster).
func (b *Beads) CloseForced(id, reason string) error {
sessionID := runtime.SessionIDFromEnv()
// Try RPC first (faster when daemon is running)
if err := b.closeViaRPC(id, reason, sessionID, true); err == nil {
return nil
}
// Fall back to subprocess
args := []string{"close", id, "--force"}
if reason != "" {
args = append(args, "--reason="+reason)
}
if sessionID != "" {
args = append(args, "--session="+sessionID)
}
@@ -798,3 +871,19 @@ func ProvisionPrimeMDForWorktree(worktreePath string) error {
// Provision PRIME.md in the target directory
return ProvisionPrimeMD(beadsDir)
}
// GetPrimeContent returns the beads workflow context content.
// It checks for a custom PRIME.md file first, otherwise returns the default.
// This eliminates the need to spawn a bd subprocess for gt prime.
func GetPrimeContent(workDir string) string {
beadsDir := ResolveBeadsDir(workDir)
primePath := filepath.Join(beadsDir, "PRIME.md")
// Check for custom PRIME.md
if content, err := os.ReadFile(primePath); err == nil {
return strings.TrimSpace(string(content))
}
// Return default content
return strings.TrimSpace(primeContent)
}

330
internal/beads/beads_rpc.go Normal file
View File

@@ -0,0 +1,330 @@
package beads
import (
"bufio"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"time"
)
// MaxUnixSocketPath is the maximum length for Unix socket paths.
const MaxUnixSocketPath = 103
// rpcClient represents an RPC client for the bd daemon.
type rpcClient struct {
conn net.Conn
socketPath string
timeout time.Duration
cwd string
}
// rpcRequest represents an RPC request to the daemon.
type rpcRequest struct {
Operation string `json:"operation"`
Args json.RawMessage `json:"args"`
Cwd string `json:"cwd,omitempty"`
}
// rpcResponse represents an RPC response from the daemon.
type rpcResponse struct {
Success bool `json:"success"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// tryConnectRPC attempts to connect to the bd daemon.
// Returns nil if no daemon is running.
func tryConnectRPC(workspacePath string) *rpcClient {
socketPath := socketPathForWorkspace(workspacePath)
// Check if socket exists
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
return nil
}
conn, err := net.DialTimeout("unix", socketPath, 200*time.Millisecond)
if err != nil {
return nil
}
client := &rpcClient{
conn: conn,
socketPath: socketPath,
timeout: 30 * time.Second,
cwd: workspacePath,
}
// Quick health check
if err := client.ping(); err != nil {
_ = conn.Close()
return nil
}
return client
}
// close closes the RPC connection.
func (c *rpcClient) close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// execute sends a request and returns the response.
func (c *rpcClient) execute(operation string, args interface{}) (*rpcResponse, error) {
argsJSON, err := json.Marshal(args)
if err != nil {
return nil, fmt.Errorf("marshaling args: %w", err)
}
req := rpcRequest{
Operation: operation,
Args: argsJSON,
Cwd: c.cwd,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshaling request: %w", err)
}
if c.timeout > 0 {
deadline := time.Now().Add(c.timeout)
if err := c.conn.SetDeadline(deadline); err != nil {
return nil, fmt.Errorf("setting deadline: %w", err)
}
}
writer := bufio.NewWriter(c.conn)
if _, err := writer.Write(reqJSON); err != nil {
return nil, fmt.Errorf("writing request: %w", err)
}
if err := writer.WriteByte('\n'); err != nil {
return nil, fmt.Errorf("writing newline: %w", err)
}
if err := writer.Flush(); err != nil {
return nil, fmt.Errorf("flushing: %w", err)
}
reader := bufio.NewReader(c.conn)
respLine, err := reader.ReadBytes('\n')
if err != nil {
return nil, fmt.Errorf("reading response: %w", err)
}
var resp rpcResponse
if err := json.Unmarshal(respLine, &resp); err != nil {
return nil, fmt.Errorf("unmarshaling response: %w", err)
}
if !resp.Success {
return &resp, fmt.Errorf("operation failed: %s", resp.Error)
}
return &resp, nil
}
// ping verifies the daemon is alive.
func (c *rpcClient) ping() error {
_, err := c.execute("ping", nil)
return err
}
// socketPathForWorkspace returns the socket path for a workspace.
// This mirrors the logic in beads/internal/rpc/socket_path.go.
func socketPathForWorkspace(workspacePath string) string {
// Compute the "natural" socket path in .beads/
naturalPath := filepath.Join(workspacePath, ".beads", "bd.sock")
// If natural path is short enough, use it
if len(naturalPath) <= MaxUnixSocketPath {
return naturalPath
}
// Path too long - use /tmp with hash
hash := sha256.Sum256([]byte(workspacePath))
hashStr := hex.EncodeToString(hash[:4])
return filepath.Join("/tmp", "beads-"+hashStr, "bd.sock")
}
// getRPCClient returns the RPC client, initializing on first call.
// Returns nil if daemon is not available.
func (b *Beads) getRPCClient() *rpcClient {
if b.rpcChecked {
return b.rpcClient
}
b.rpcChecked = true
// Don't use RPC in isolated mode (tests)
if b.isolated {
return nil
}
// Resolve workspace path for socket discovery
workspacePath := b.beadsDir
if workspacePath == "" {
workspacePath = ResolveBeadsDir(b.workDir)
}
// Get the workspace root (parent of .beads)
if filepath.Base(workspacePath) == ".beads" {
workspacePath = filepath.Dir(workspacePath)
}
b.rpcClient = tryConnectRPC(workspacePath)
b.rpcAvailable = b.rpcClient != nil
return b.rpcClient
}
// closeRPC closes the RPC client if connected.
func (b *Beads) closeRPC() {
if b.rpcClient != nil {
_ = b.rpcClient.close()
b.rpcClient = nil
}
}
// RPC operation argument types
type rpcListArgs struct {
Status string `json:"status,omitempty"`
Assignee string `json:"assignee,omitempty"`
Labels []string `json:"labels,omitempty"`
LabelsAny []string `json:"labels_any,omitempty"`
ExcludeStatus []string `json:"exclude_status,omitempty"`
Priority *int `json:"priority,omitempty"`
ParentID string `json:"parent_id,omitempty"`
NoAssignee bool `json:"no_assignee,omitempty"`
Limit int `json:"limit,omitempty"`
}
type rpcShowArgs struct {
ID string `json:"id"`
}
type rpcUpdateArgs struct {
ID string `json:"id"`
Title *string `json:"title,omitempty"`
Status *string `json:"status,omitempty"`
Priority *int `json:"priority,omitempty"`
Description *string `json:"description,omitempty"`
Assignee *string `json:"assignee,omitempty"`
AddLabels []string `json:"add_labels,omitempty"`
RemoveLabels []string `json:"remove_labels,omitempty"`
SetLabels []string `json:"set_labels,omitempty"`
}
type rpcCloseArgs struct {
ID string `json:"id"`
Reason string `json:"reason,omitempty"`
Session string `json:"session,omitempty"`
Force bool `json:"force,omitempty"`
}
// listViaRPC performs a list operation via the daemon RPC.
func (b *Beads) listViaRPC(opts ListOptions) ([]*Issue, error) {
client := b.getRPCClient()
if client == nil {
return nil, fmt.Errorf("no RPC client")
}
args := rpcListArgs{
Status: opts.Status,
Assignee: opts.Assignee,
ParentID: opts.Parent,
}
// Convert Label to Labels array if set
if opts.Label != "" {
args.Labels = []string{opts.Label}
}
// Handle priority: -1 means no filter
if opts.Priority >= 0 {
args.Priority = &opts.Priority
}
if opts.NoAssignee {
args.NoAssignee = true
}
resp, err := client.execute("list", args)
if err != nil {
return nil, err
}
var issues []*Issue
if err := json.Unmarshal(resp.Data, &issues); err != nil {
return nil, fmt.Errorf("unmarshaling issues: %w", err)
}
return issues, nil
}
// showViaRPC performs a show operation via the daemon RPC.
func (b *Beads) showViaRPC(id string) (*Issue, error) {
client := b.getRPCClient()
if client == nil {
return nil, fmt.Errorf("no RPC client")
}
resp, err := client.execute("show", rpcShowArgs{ID: id})
if err != nil {
return nil, err
}
var issue Issue
if err := json.Unmarshal(resp.Data, &issue); err != nil {
return nil, fmt.Errorf("unmarshaling issue: %w", err)
}
return &issue, nil
}
// updateViaRPC performs an update operation via the daemon RPC.
func (b *Beads) updateViaRPC(id string, opts UpdateOptions) error {
client := b.getRPCClient()
if client == nil {
return fmt.Errorf("no RPC client")
}
args := rpcUpdateArgs{
ID: id,
Title: opts.Title,
Status: opts.Status,
Priority: opts.Priority,
Description: opts.Description,
Assignee: opts.Assignee,
AddLabels: opts.AddLabels,
RemoveLabels: opts.RemoveLabels,
SetLabels: opts.SetLabels,
}
_, err := client.execute("update", args)
return err
}
// closeViaRPC performs a close operation via the daemon RPC.
func (b *Beads) closeViaRPC(id, reason, session string, force bool) error {
client := b.getRPCClient()
if client == nil {
return fmt.Errorf("no RPC client")
}
args := rpcCloseArgs{
ID: id,
Reason: reason,
Session: session,
Force: force,
}
_, err := client.execute("close", args)
return err
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/spf13/cobra"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/events"
"github.com/steveyegge/gastown/internal/runtime"
"github.com/steveyegge/gastown/internal/style"
"github.com/steveyegge/gastown/internal/workspace"
)
@@ -185,15 +184,8 @@ func runHook(_ *cobra.Command, args []string) error {
fmt.Printf("%s Replacing completed bead %s...\n", style.Dim.Render(""), existing.ID)
if !hookDryRun {
if hasAttachment {
// Close completed molecule bead (use bd close --force for pinned)
closeArgs := []string{"close", existing.ID, "--force",
"--reason=Auto-replaced by gt hook (molecule complete)"}
if sessionID := runtime.SessionIDFromEnv(); sessionID != "" {
closeArgs = append(closeArgs, "--session="+sessionID)
}
closeCmd := exec.Command("bd", closeArgs...)
closeCmd.Stderr = os.Stderr
if err := closeCmd.Run(); err != nil {
// Close completed molecule bead (use force for pinned)
if err := b.CloseForced(existing.ID, "Auto-replaced by gt hook (molecule complete)"); err != nil {
return fmt.Errorf("closing completed bead %s: %w", existing.ID, err)
}
} else {
@@ -234,15 +226,9 @@ func runHook(_ *cobra.Command, args []string) error {
return nil
}
// Hook the bead using bd update (discovery-based approach)
// Run from town root so bd can find routes.jsonl for prefix-based routing.
// This is essential for hooking convoys (hq-* prefix) stored in town beads.
hookCmd := exec.Command("bd", "update", beadID, "--status=hooked", "--assignee="+agentID)
if townRoot, err := workspace.FindFromCwd(); err == nil {
hookCmd.Dir = townRoot
}
hookCmd.Stderr = os.Stderr
if err := hookCmd.Run(); err != nil {
// Hook the bead using beads package (uses RPC when daemon available)
status := beads.StatusHooked
if err := b.Update(beadID, beads.UpdateOptions{Status: &status, Assignee: &agentID}); err != nil {
return fmt.Errorf("hooking bead: %w", err)
}

View File

@@ -2,7 +2,6 @@ package cmd
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
@@ -340,29 +339,13 @@ func detectRole(cwd, townRoot string) RoleInfo {
return ctx
}
// runBdPrime runs `bd prime` and outputs the result.
// This provides beads workflow context to the agent.
// runBdPrime outputs beads workflow context directly.
// This replaces the bd subprocess call to eliminate ~40ms startup overhead.
func runBdPrime(workDir string) {
cmd := exec.Command("bd", "prime")
cmd.Dir = workDir
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
// Skip if bd prime fails (beads might not be available)
// But log stderr if present for debugging
if errMsg := strings.TrimSpace(stderr.String()); errMsg != "" {
fmt.Fprintf(os.Stderr, "bd prime: %s\n", errMsg)
}
return
}
output := strings.TrimSpace(stdout.String())
if output != "" {
content := beads.GetPrimeContent(workDir)
if content != "" {
fmt.Println()
fmt.Println(output)
fmt.Println(content)
}
}
@@ -706,34 +689,20 @@ func ensureBeadsRedirect(ctx RoleContext) {
// checkPendingEscalations queries for open escalation beads and displays them prominently.
// This is called on Mayor startup to surface issues needing human attention.
// Uses beads package which leverages RPC when daemon is available.
func checkPendingEscalations(ctx RoleContext) {
// Query for open escalations using bd list with tag filter
cmd := exec.Command("bd", "list", "--status=open", "--tag=escalation", "--json")
cmd.Dir = ctx.WorkDir
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
// Query for open escalations using beads package (uses RPC when available)
b := beads.New(ctx.WorkDir)
escalations, err := b.List(beads.ListOptions{
Status: "open",
Label: "escalation",
Priority: -1,
})
if err != nil || len(escalations) == 0 {
// Silently skip - escalation check is best-effort
return
}
// Parse JSON output
var escalations []struct {
ID string `json:"id"`
Title string `json:"title"`
Priority int `json:"priority"`
Description string `json:"description"`
Created string `json:"created"`
}
if err := json.Unmarshal(stdout.Bytes(), &escalations); err != nil || len(escalations) == 0 {
// No escalations or parse error
return
}
// Count by severity
critical := 0
high := 0