package beads import ( "bufio" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "net" "os" "path/filepath" "time" ) // MaxUnixSocketPath is the maximum length for Unix socket paths. const MaxUnixSocketPath = 103 // rpcClient represents an RPC client for the bd daemon. type rpcClient struct { conn net.Conn socketPath string timeout time.Duration cwd string } // rpcRequest represents an RPC request to the daemon. type rpcRequest struct { Operation string `json:"operation"` Args json.RawMessage `json:"args"` Cwd string `json:"cwd,omitempty"` } // rpcResponse represents an RPC response from the daemon. type rpcResponse struct { Success bool `json:"success"` Data json.RawMessage `json:"data,omitempty"` Error string `json:"error,omitempty"` } // tryConnectRPC attempts to connect to the bd daemon. // Returns nil if no daemon is running. func tryConnectRPC(workspacePath string) *rpcClient { socketPath := socketPathForWorkspace(workspacePath) // Check if socket exists if _, err := os.Stat(socketPath); os.IsNotExist(err) { return nil } conn, err := net.DialTimeout("unix", socketPath, 200*time.Millisecond) if err != nil { return nil } client := &rpcClient{ conn: conn, socketPath: socketPath, timeout: 30 * time.Second, cwd: workspacePath, } // Quick health check if err := client.ping(); err != nil { _ = conn.Close() return nil } return client } // close closes the RPC connection. func (c *rpcClient) close() error { if c.conn != nil { return c.conn.Close() } return nil } // execute sends a request and returns the response. func (c *rpcClient) execute(operation string, args interface{}) (*rpcResponse, error) { argsJSON, err := json.Marshal(args) if err != nil { return nil, fmt.Errorf("marshaling args: %w", err) } req := rpcRequest{ Operation: operation, Args: argsJSON, Cwd: c.cwd, } reqJSON, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("marshaling request: %w", err) } if c.timeout > 0 { deadline := time.Now().Add(c.timeout) if err := c.conn.SetDeadline(deadline); err != nil { return nil, fmt.Errorf("setting deadline: %w", err) } } writer := bufio.NewWriter(c.conn) if _, err := writer.Write(reqJSON); err != nil { return nil, fmt.Errorf("writing request: %w", err) } if err := writer.WriteByte('\n'); err != nil { return nil, fmt.Errorf("writing newline: %w", err) } if err := writer.Flush(); err != nil { return nil, fmt.Errorf("flushing: %w", err) } reader := bufio.NewReader(c.conn) respLine, err := reader.ReadBytes('\n') if err != nil { return nil, fmt.Errorf("reading response: %w", err) } var resp rpcResponse if err := json.Unmarshal(respLine, &resp); err != nil { return nil, fmt.Errorf("unmarshaling response: %w", err) } if !resp.Success { return &resp, fmt.Errorf("operation failed: %s", resp.Error) } return &resp, nil } // ping verifies the daemon is alive. func (c *rpcClient) ping() error { _, err := c.execute("ping", nil) return err } // socketPathForWorkspace returns the socket path for a workspace. // This mirrors the logic in beads/internal/rpc/socket_path.go. func socketPathForWorkspace(workspacePath string) string { // Compute the "natural" socket path in .beads/ naturalPath := filepath.Join(workspacePath, ".beads", "bd.sock") // If natural path is short enough, use it if len(naturalPath) <= MaxUnixSocketPath { return naturalPath } // Path too long - use /tmp with hash hash := sha256.Sum256([]byte(workspacePath)) hashStr := hex.EncodeToString(hash[:4]) return filepath.Join("/tmp", "beads-"+hashStr, "bd.sock") } // getRPCClient returns the RPC client, initializing on first call. // Returns nil if daemon is not available. func (b *Beads) getRPCClient() *rpcClient { if b.rpcChecked { return b.rpcClient } b.rpcChecked = true // Don't use RPC in isolated mode (tests) if b.isolated { return nil } // Resolve workspace path for socket discovery workspacePath := b.beadsDir if workspacePath == "" { workspacePath = ResolveBeadsDir(b.workDir) } // Get the workspace root (parent of .beads) if filepath.Base(workspacePath) == ".beads" { workspacePath = filepath.Dir(workspacePath) } b.rpcClient = tryConnectRPC(workspacePath) b.rpcAvailable = b.rpcClient != nil return b.rpcClient } // closeRPC closes the RPC client if connected. func (b *Beads) closeRPC() { if b.rpcClient != nil { _ = b.rpcClient.close() b.rpcClient = nil } } // RPC operation argument types type rpcListArgs struct { Status string `json:"status,omitempty"` Assignee string `json:"assignee,omitempty"` Labels []string `json:"labels,omitempty"` LabelsAny []string `json:"labels_any,omitempty"` ExcludeStatus []string `json:"exclude_status,omitempty"` Priority *int `json:"priority,omitempty"` ParentID string `json:"parent_id,omitempty"` NoAssignee bool `json:"no_assignee,omitempty"` Limit int `json:"limit,omitempty"` } type rpcShowArgs struct { ID string `json:"id"` } type rpcUpdateArgs struct { ID string `json:"id"` Title *string `json:"title,omitempty"` Status *string `json:"status,omitempty"` Priority *int `json:"priority,omitempty"` Description *string `json:"description,omitempty"` Assignee *string `json:"assignee,omitempty"` AddLabels []string `json:"add_labels,omitempty"` RemoveLabels []string `json:"remove_labels,omitempty"` SetLabels []string `json:"set_labels,omitempty"` } type rpcCloseArgs struct { ID string `json:"id"` Reason string `json:"reason,omitempty"` Session string `json:"session,omitempty"` Force bool `json:"force,omitempty"` } // listViaRPC performs a list operation via the daemon RPC. func (b *Beads) listViaRPC(opts ListOptions) ([]*Issue, error) { client := b.getRPCClient() if client == nil { return nil, fmt.Errorf("no RPC client") } args := rpcListArgs{ Status: opts.Status, Assignee: opts.Assignee, ParentID: opts.Parent, } // Convert Label to Labels array if set if opts.Label != "" { args.Labels = []string{opts.Label} } // Handle priority: -1 means no filter if opts.Priority >= 0 { args.Priority = &opts.Priority } if opts.NoAssignee { args.NoAssignee = true } resp, err := client.execute("list", args) if err != nil { return nil, err } var issues []*Issue if err := json.Unmarshal(resp.Data, &issues); err != nil { return nil, fmt.Errorf("unmarshaling issues: %w", err) } return issues, nil } // showViaRPC performs a show operation via the daemon RPC. func (b *Beads) showViaRPC(id string) (*Issue, error) { client := b.getRPCClient() if client == nil { return nil, fmt.Errorf("no RPC client") } resp, err := client.execute("show", rpcShowArgs{ID: id}) if err != nil { return nil, err } var issue Issue if err := json.Unmarshal(resp.Data, &issue); err != nil { return nil, fmt.Errorf("unmarshaling issue: %w", err) } return &issue, nil } // updateViaRPC performs an update operation via the daemon RPC. func (b *Beads) updateViaRPC(id string, opts UpdateOptions) error { client := b.getRPCClient() if client == nil { return fmt.Errorf("no RPC client") } args := rpcUpdateArgs{ ID: id, Title: opts.Title, Status: opts.Status, Priority: opts.Priority, Description: opts.Description, Assignee: opts.Assignee, AddLabels: opts.AddLabels, RemoveLabels: opts.RemoveLabels, SetLabels: opts.SetLabels, } _, err := client.execute("update", args) return err } // closeViaRPC performs a close operation via the daemon RPC. func (b *Beads) closeViaRPC(id, reason, session string, force bool) error { client := b.getRPCClient() if client == nil { return fmt.Errorf("no RPC client") } args := rpcCloseArgs{ ID: id, Reason: reason, Session: session, Force: force, } _, err := client.execute("close", args) return err }