Replace bd subprocess calls in gt commands with daemon RPC when available. Each subprocess call has ~40ms overhead for Go binary startup, so using the daemon's Unix socket protocol significantly reduces latency. Changes: - Add RPC client to beads package (beads_rpc.go) - Modify List/Show/Update/Close methods to try RPC first, fall back to subprocess - Replace runBdPrime() with direct content output (avoids bd subprocess) - Replace checkPendingEscalations() to use beads.List() with RPC - Replace hook.go bd subprocess calls with beads package methods The RPC client: - Connects to daemon via Unix socket at .beads/bd.sock - Uses JSON-based request/response protocol (same as bd daemon) - Falls back gracefully to subprocess if daemon unavailable - Lazy-initializes connection on first use Performance improvement targets (from bd-2zd.2): - gt prime < 100ms (was 5.8s with subprocess chain) - gt hook < 100ms (was ~323ms) Closes: bd-2zd.2
331 lines
7.9 KiB
Go
331 lines
7.9 KiB
Go
package beads
|
|
|
|
import (
|
|
"bufio"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
)
|
|
|
|
// MaxUnixSocketPath is the maximum length for Unix socket paths.
|
|
const MaxUnixSocketPath = 103
|
|
|
|
// rpcClient represents an RPC client for the bd daemon.
|
|
type rpcClient struct {
|
|
conn net.Conn
|
|
socketPath string
|
|
timeout time.Duration
|
|
cwd string
|
|
}
|
|
|
|
// rpcRequest represents an RPC request to the daemon.
|
|
type rpcRequest struct {
|
|
Operation string `json:"operation"`
|
|
Args json.RawMessage `json:"args"`
|
|
Cwd string `json:"cwd,omitempty"`
|
|
}
|
|
|
|
// rpcResponse represents an RPC response from the daemon.
|
|
type rpcResponse struct {
|
|
Success bool `json:"success"`
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// tryConnectRPC attempts to connect to the bd daemon.
|
|
// Returns nil if no daemon is running.
|
|
func tryConnectRPC(workspacePath string) *rpcClient {
|
|
socketPath := socketPathForWorkspace(workspacePath)
|
|
|
|
// Check if socket exists
|
|
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
|
|
conn, err := net.DialTimeout("unix", socketPath, 200*time.Millisecond)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
client := &rpcClient{
|
|
conn: conn,
|
|
socketPath: socketPath,
|
|
timeout: 30 * time.Second,
|
|
cwd: workspacePath,
|
|
}
|
|
|
|
// Quick health check
|
|
if err := client.ping(); err != nil {
|
|
_ = conn.Close()
|
|
return nil
|
|
}
|
|
|
|
return client
|
|
}
|
|
|
|
// close closes the RPC connection.
|
|
func (c *rpcClient) close() error {
|
|
if c.conn != nil {
|
|
return c.conn.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// execute sends a request and returns the response.
|
|
func (c *rpcClient) execute(operation string, args interface{}) (*rpcResponse, error) {
|
|
argsJSON, err := json.Marshal(args)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling args: %w", err)
|
|
}
|
|
|
|
req := rpcRequest{
|
|
Operation: operation,
|
|
Args: argsJSON,
|
|
Cwd: c.cwd,
|
|
}
|
|
|
|
reqJSON, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling request: %w", err)
|
|
}
|
|
|
|
if c.timeout > 0 {
|
|
deadline := time.Now().Add(c.timeout)
|
|
if err := c.conn.SetDeadline(deadline); err != nil {
|
|
return nil, fmt.Errorf("setting deadline: %w", err)
|
|
}
|
|
}
|
|
|
|
writer := bufio.NewWriter(c.conn)
|
|
if _, err := writer.Write(reqJSON); err != nil {
|
|
return nil, fmt.Errorf("writing request: %w", err)
|
|
}
|
|
if err := writer.WriteByte('\n'); err != nil {
|
|
return nil, fmt.Errorf("writing newline: %w", err)
|
|
}
|
|
if err := writer.Flush(); err != nil {
|
|
return nil, fmt.Errorf("flushing: %w", err)
|
|
}
|
|
|
|
reader := bufio.NewReader(c.conn)
|
|
respLine, err := reader.ReadBytes('\n')
|
|
if err != nil {
|
|
return nil, fmt.Errorf("reading response: %w", err)
|
|
}
|
|
|
|
var resp rpcResponse
|
|
if err := json.Unmarshal(respLine, &resp); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling response: %w", err)
|
|
}
|
|
|
|
if !resp.Success {
|
|
return &resp, fmt.Errorf("operation failed: %s", resp.Error)
|
|
}
|
|
|
|
return &resp, nil
|
|
}
|
|
|
|
// ping verifies the daemon is alive.
|
|
func (c *rpcClient) ping() error {
|
|
_, err := c.execute("ping", nil)
|
|
return err
|
|
}
|
|
|
|
// socketPathForWorkspace returns the socket path for a workspace.
|
|
// This mirrors the logic in beads/internal/rpc/socket_path.go.
|
|
func socketPathForWorkspace(workspacePath string) string {
|
|
// Compute the "natural" socket path in .beads/
|
|
naturalPath := filepath.Join(workspacePath, ".beads", "bd.sock")
|
|
|
|
// If natural path is short enough, use it
|
|
if len(naturalPath) <= MaxUnixSocketPath {
|
|
return naturalPath
|
|
}
|
|
|
|
// Path too long - use /tmp with hash
|
|
hash := sha256.Sum256([]byte(workspacePath))
|
|
hashStr := hex.EncodeToString(hash[:4])
|
|
return filepath.Join("/tmp", "beads-"+hashStr, "bd.sock")
|
|
}
|
|
|
|
// getRPCClient returns the RPC client, initializing on first call.
|
|
// Returns nil if daemon is not available.
|
|
func (b *Beads) getRPCClient() *rpcClient {
|
|
if b.rpcChecked {
|
|
return b.rpcClient
|
|
}
|
|
|
|
b.rpcChecked = true
|
|
|
|
// Don't use RPC in isolated mode (tests)
|
|
if b.isolated {
|
|
return nil
|
|
}
|
|
|
|
// Resolve workspace path for socket discovery
|
|
workspacePath := b.beadsDir
|
|
if workspacePath == "" {
|
|
workspacePath = ResolveBeadsDir(b.workDir)
|
|
}
|
|
|
|
// Get the workspace root (parent of .beads)
|
|
if filepath.Base(workspacePath) == ".beads" {
|
|
workspacePath = filepath.Dir(workspacePath)
|
|
}
|
|
|
|
b.rpcClient = tryConnectRPC(workspacePath)
|
|
b.rpcAvailable = b.rpcClient != nil
|
|
return b.rpcClient
|
|
}
|
|
|
|
// closeRPC closes the RPC client if connected.
|
|
func (b *Beads) closeRPC() {
|
|
if b.rpcClient != nil {
|
|
_ = b.rpcClient.close()
|
|
b.rpcClient = nil
|
|
}
|
|
}
|
|
|
|
// RPC operation argument types
|
|
|
|
type rpcListArgs struct {
|
|
Status string `json:"status,omitempty"`
|
|
Assignee string `json:"assignee,omitempty"`
|
|
Labels []string `json:"labels,omitempty"`
|
|
LabelsAny []string `json:"labels_any,omitempty"`
|
|
ExcludeStatus []string `json:"exclude_status,omitempty"`
|
|
Priority *int `json:"priority,omitempty"`
|
|
ParentID string `json:"parent_id,omitempty"`
|
|
NoAssignee bool `json:"no_assignee,omitempty"`
|
|
Limit int `json:"limit,omitempty"`
|
|
}
|
|
|
|
type rpcShowArgs struct {
|
|
ID string `json:"id"`
|
|
}
|
|
|
|
type rpcUpdateArgs struct {
|
|
ID string `json:"id"`
|
|
Title *string `json:"title,omitempty"`
|
|
Status *string `json:"status,omitempty"`
|
|
Priority *int `json:"priority,omitempty"`
|
|
Description *string `json:"description,omitempty"`
|
|
Assignee *string `json:"assignee,omitempty"`
|
|
AddLabels []string `json:"add_labels,omitempty"`
|
|
RemoveLabels []string `json:"remove_labels,omitempty"`
|
|
SetLabels []string `json:"set_labels,omitempty"`
|
|
}
|
|
|
|
type rpcCloseArgs struct {
|
|
ID string `json:"id"`
|
|
Reason string `json:"reason,omitempty"`
|
|
Session string `json:"session,omitempty"`
|
|
Force bool `json:"force,omitempty"`
|
|
}
|
|
|
|
// listViaRPC performs a list operation via the daemon RPC.
|
|
func (b *Beads) listViaRPC(opts ListOptions) ([]*Issue, error) {
|
|
client := b.getRPCClient()
|
|
if client == nil {
|
|
return nil, fmt.Errorf("no RPC client")
|
|
}
|
|
|
|
args := rpcListArgs{
|
|
Status: opts.Status,
|
|
Assignee: opts.Assignee,
|
|
ParentID: opts.Parent,
|
|
}
|
|
|
|
// Convert Label to Labels array if set
|
|
if opts.Label != "" {
|
|
args.Labels = []string{opts.Label}
|
|
}
|
|
|
|
// Handle priority: -1 means no filter
|
|
if opts.Priority >= 0 {
|
|
args.Priority = &opts.Priority
|
|
}
|
|
|
|
if opts.NoAssignee {
|
|
args.NoAssignee = true
|
|
}
|
|
|
|
resp, err := client.execute("list", args)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var issues []*Issue
|
|
if err := json.Unmarshal(resp.Data, &issues); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling issues: %w", err)
|
|
}
|
|
|
|
return issues, nil
|
|
}
|
|
|
|
// showViaRPC performs a show operation via the daemon RPC.
|
|
func (b *Beads) showViaRPC(id string) (*Issue, error) {
|
|
client := b.getRPCClient()
|
|
if client == nil {
|
|
return nil, fmt.Errorf("no RPC client")
|
|
}
|
|
|
|
resp, err := client.execute("show", rpcShowArgs{ID: id})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var issue Issue
|
|
if err := json.Unmarshal(resp.Data, &issue); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling issue: %w", err)
|
|
}
|
|
|
|
return &issue, nil
|
|
}
|
|
|
|
// updateViaRPC performs an update operation via the daemon RPC.
|
|
func (b *Beads) updateViaRPC(id string, opts UpdateOptions) error {
|
|
client := b.getRPCClient()
|
|
if client == nil {
|
|
return fmt.Errorf("no RPC client")
|
|
}
|
|
|
|
args := rpcUpdateArgs{
|
|
ID: id,
|
|
Title: opts.Title,
|
|
Status: opts.Status,
|
|
Priority: opts.Priority,
|
|
Description: opts.Description,
|
|
Assignee: opts.Assignee,
|
|
AddLabels: opts.AddLabels,
|
|
RemoveLabels: opts.RemoveLabels,
|
|
SetLabels: opts.SetLabels,
|
|
}
|
|
|
|
_, err := client.execute("update", args)
|
|
return err
|
|
}
|
|
|
|
// closeViaRPC performs a close operation via the daemon RPC.
|
|
func (b *Beads) closeViaRPC(id, reason, session string, force bool) error {
|
|
client := b.getRPCClient()
|
|
if client == nil {
|
|
return fmt.Errorf("no RPC client")
|
|
}
|
|
|
|
args := rpcCloseArgs{
|
|
ID: id,
|
|
Reason: reason,
|
|
Session: session,
|
|
Force: force,
|
|
}
|
|
|
|
_, err := client.execute("close", args)
|
|
return err
|
|
}
|