Files
gastown/internal/beads/beads_rpc.go
mayor d0036b0768
Some checks failed
CI / Check for .beads changes (push) Has been skipped
CI / Check embedded formulas (push) Failing after 19s
CI / Test (push) Failing after 1m12s
CI / Lint (push) Failing after 22s
CI / Integration Tests (push) Failing after 36s
CI / Coverage Report (push) Has been skipped
Windows CI / Windows Build and Unit Tests (push) Has been cancelled
fix(beads): handle deprecated Type field in listViaRPC
The RPC path for List operations was not converting opts.Type to a label,
causing all issues to be returned when the daemon was running. This resulted
in gt mq list showing all beads (including wisps) instead of just merge-requests.

The subprocess fallback path had this conversion, but RPC did not.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 10:36:43 -08:00

335 lines
8.1 KiB
Go

package beads
import (
"bufio"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"time"
)
// MaxUnixSocketPath is the maximum length for Unix socket paths.
const MaxUnixSocketPath = 103
// rpcClient represents an RPC client for the bd daemon.
type rpcClient struct {
conn net.Conn
socketPath string
timeout time.Duration
cwd string
}
// rpcRequest represents an RPC request to the daemon.
type rpcRequest struct {
Operation string `json:"operation"`
Args json.RawMessage `json:"args"`
Cwd string `json:"cwd,omitempty"`
}
// rpcResponse represents an RPC response from the daemon.
type rpcResponse struct {
Success bool `json:"success"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// tryConnectRPC attempts to connect to the bd daemon.
// Returns nil if no daemon is running.
func tryConnectRPC(workspacePath string) *rpcClient {
socketPath := socketPathForWorkspace(workspacePath)
// Check if socket exists
if _, err := os.Stat(socketPath); os.IsNotExist(err) {
return nil
}
conn, err := net.DialTimeout("unix", socketPath, 200*time.Millisecond)
if err != nil {
return nil
}
client := &rpcClient{
conn: conn,
socketPath: socketPath,
timeout: 30 * time.Second,
cwd: workspacePath,
}
// Quick health check
if err := client.ping(); err != nil {
_ = conn.Close()
return nil
}
return client
}
// close closes the RPC connection.
func (c *rpcClient) close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// execute sends a request and returns the response.
func (c *rpcClient) execute(operation string, args interface{}) (*rpcResponse, error) {
argsJSON, err := json.Marshal(args)
if err != nil {
return nil, fmt.Errorf("marshaling args: %w", err)
}
req := rpcRequest{
Operation: operation,
Args: argsJSON,
Cwd: c.cwd,
}
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshaling request: %w", err)
}
if c.timeout > 0 {
deadline := time.Now().Add(c.timeout)
if err := c.conn.SetDeadline(deadline); err != nil {
return nil, fmt.Errorf("setting deadline: %w", err)
}
}
writer := bufio.NewWriter(c.conn)
if _, err := writer.Write(reqJSON); err != nil {
return nil, fmt.Errorf("writing request: %w", err)
}
if err := writer.WriteByte('\n'); err != nil {
return nil, fmt.Errorf("writing newline: %w", err)
}
if err := writer.Flush(); err != nil {
return nil, fmt.Errorf("flushing: %w", err)
}
reader := bufio.NewReader(c.conn)
respLine, err := reader.ReadBytes('\n')
if err != nil {
return nil, fmt.Errorf("reading response: %w", err)
}
var resp rpcResponse
if err := json.Unmarshal(respLine, &resp); err != nil {
return nil, fmt.Errorf("unmarshaling response: %w", err)
}
if !resp.Success {
return &resp, fmt.Errorf("operation failed: %s", resp.Error)
}
return &resp, nil
}
// ping verifies the daemon is alive.
func (c *rpcClient) ping() error {
_, err := c.execute("ping", nil)
return err
}
// socketPathForWorkspace returns the socket path for a workspace.
// This mirrors the logic in beads/internal/rpc/socket_path.go.
func socketPathForWorkspace(workspacePath string) string {
// Compute the "natural" socket path in .beads/
naturalPath := filepath.Join(workspacePath, ".beads", "bd.sock")
// If natural path is short enough, use it
if len(naturalPath) <= MaxUnixSocketPath {
return naturalPath
}
// Path too long - use /tmp with hash
hash := sha256.Sum256([]byte(workspacePath))
hashStr := hex.EncodeToString(hash[:4])
return filepath.Join("/tmp", "beads-"+hashStr, "bd.sock")
}
// getRPCClient returns the RPC client, initializing on first call.
// Returns nil if daemon is not available.
func (b *Beads) getRPCClient() *rpcClient {
if b.rpcChecked {
return b.rpcClient
}
b.rpcChecked = true
// Don't use RPC in isolated mode (tests)
if b.isolated {
return nil
}
// Resolve workspace path for socket discovery
workspacePath := b.beadsDir
if workspacePath == "" {
workspacePath = ResolveBeadsDir(b.workDir)
}
// Get the workspace root (parent of .beads)
if filepath.Base(workspacePath) == ".beads" {
workspacePath = filepath.Dir(workspacePath)
}
b.rpcClient = tryConnectRPC(workspacePath)
b.rpcAvailable = b.rpcClient != nil
return b.rpcClient
}
// closeRPC closes the RPC client if connected.
func (b *Beads) closeRPC() {
if b.rpcClient != nil {
_ = b.rpcClient.close()
b.rpcClient = nil
}
}
// RPC operation argument types
type rpcListArgs struct {
Status string `json:"status,omitempty"`
Assignee string `json:"assignee,omitempty"`
Labels []string `json:"labels,omitempty"`
LabelsAny []string `json:"labels_any,omitempty"`
ExcludeStatus []string `json:"exclude_status,omitempty"`
Priority *int `json:"priority,omitempty"`
ParentID string `json:"parent_id,omitempty"`
NoAssignee bool `json:"no_assignee,omitempty"`
Limit int `json:"limit,omitempty"`
}
type rpcShowArgs struct {
ID string `json:"id"`
}
type rpcUpdateArgs struct {
ID string `json:"id"`
Title *string `json:"title,omitempty"`
Status *string `json:"status,omitempty"`
Priority *int `json:"priority,omitempty"`
Description *string `json:"description,omitempty"`
Assignee *string `json:"assignee,omitempty"`
AddLabels []string `json:"add_labels,omitempty"`
RemoveLabels []string `json:"remove_labels,omitempty"`
SetLabels []string `json:"set_labels,omitempty"`
}
type rpcCloseArgs struct {
ID string `json:"id"`
Reason string `json:"reason,omitempty"`
Session string `json:"session,omitempty"`
Force bool `json:"force,omitempty"`
}
// listViaRPC performs a list operation via the daemon RPC.
func (b *Beads) listViaRPC(opts ListOptions) ([]*Issue, error) {
client := b.getRPCClient()
if client == nil {
return nil, fmt.Errorf("no RPC client")
}
args := rpcListArgs{
Status: opts.Status,
Assignee: opts.Assignee,
ParentID: opts.Parent,
}
// Convert Label to Labels array if set
// Also handle deprecated Type field by converting to gt: label
if opts.Label != "" {
args.Labels = []string{opts.Label}
} else if opts.Type != "" {
// Deprecated: convert type to label for backward compatibility
args.Labels = []string{"gt:" + opts.Type}
}
// Handle priority: -1 means no filter
if opts.Priority >= 0 {
args.Priority = &opts.Priority
}
if opts.NoAssignee {
args.NoAssignee = true
}
resp, err := client.execute("list", args)
if err != nil {
return nil, err
}
var issues []*Issue
if err := json.Unmarshal(resp.Data, &issues); err != nil {
return nil, fmt.Errorf("unmarshaling issues: %w", err)
}
return issues, nil
}
// showViaRPC performs a show operation via the daemon RPC.
func (b *Beads) showViaRPC(id string) (*Issue, error) {
client := b.getRPCClient()
if client == nil {
return nil, fmt.Errorf("no RPC client")
}
resp, err := client.execute("show", rpcShowArgs{ID: id})
if err != nil {
return nil, err
}
var issue Issue
if err := json.Unmarshal(resp.Data, &issue); err != nil {
return nil, fmt.Errorf("unmarshaling issue: %w", err)
}
return &issue, nil
}
// updateViaRPC performs an update operation via the daemon RPC.
func (b *Beads) updateViaRPC(id string, opts UpdateOptions) error {
client := b.getRPCClient()
if client == nil {
return fmt.Errorf("no RPC client")
}
args := rpcUpdateArgs{
ID: id,
Title: opts.Title,
Status: opts.Status,
Priority: opts.Priority,
Description: opts.Description,
Assignee: opts.Assignee,
AddLabels: opts.AddLabels,
RemoveLabels: opts.RemoveLabels,
SetLabels: opts.SetLabels,
}
_, err := client.execute("update", args)
return err
}
// closeViaRPC performs a close operation via the daemon RPC.
func (b *Beads) closeViaRPC(id, reason, session string, force bool) error {
client := b.getRPCClient()
if client == nil {
return fmt.Errorf("no RPC client")
}
args := rpcCloseArgs{
ID: id,
Reason: reason,
Session: session,
Force: force,
}
_, err := client.execute("close", args)
return err
}