* feat(ready,blocked): Add --parent flag for scoping by epic/bead descendants
Add --parent flag to `bd ready` and `bd blocked` CLI commands and MCP tools
to filter results to all descendants of a specific epic or bead.
## Backward Compatibility
- CLI: New optional --parent flag; existing usage unchanged
- RPC: New `blocked` operation added (was missing); existing operations unchanged
- MCP: New optional `parent` parameter; existing calls work as before
- Storage interface: GetBlockedIssues signature changed to accept WorkFilter
- All callers updated to pass empty filter for existing behavior
- Empty WorkFilter{} returns identical results to previous implementation
## Implementation Details
SQLite uses recursive CTE to traverse parent-child hierarchy:
WITH RECURSIVE descendants AS (
SELECT issue_id FROM dependencies
WHERE type = 'parent-child' AND depends_on_id = ?
UNION ALL
SELECT d.issue_id FROM dependencies d
JOIN descendants dt ON d.depends_on_id = dt.issue_id
WHERE d.type = 'parent-child'
)
SELECT issue_id FROM descendants
MemoryStorage implements equivalent recursive traversal with visited-set
cycle protection via collectDescendants helper.
Parent filter composes with existing filters (priority, labels, assignee, etc.)
as an additional WHERE clause - all filters are AND'd together.
## RPC Blocked Support
MCP beads_blocked() existed but daemon client raised NotImplementedError.
Added OpBlocked and handleBlocked to enable daemon RPC path, which was
previously broken. Now both CLI and daemon clients work for blocked queries.
## Changes
- internal/types/types.go: Add ParentID *string to WorkFilter
- internal/storage/sqlite/ready.go: Add recursive CTE for parent filtering
- internal/storage/memory/memory.go: Add getAllDescendants/collectDescendants
- internal/storage/storage.go: Update GetBlockedIssues interface signature
- cmd/bd/ready.go: Add --parent flag to ready and blocked commands
- internal/rpc/protocol.go: Add OpBlocked constant and BlockedArgs type
- internal/rpc/server_issues_epics.go: Add handleBlocked RPC handler
- internal/rpc/client.go: Add Blocked client method
- integrations/beads-mcp/: Add BlockedParams model and parent parameter
## Usage
bd ready --parent bd-abc # All ready descendants
bd ready --parent bd-abc --priority 1 # Combined with other filters
bd blocked --parent bd-abc # All blocked descendants
## Testing
Added 4 test cases for parent filtering:
- TestParentIDFilterDescendants: Verifies recursive traversal (grandchildren)
- TestParentIDWithOtherFilters: Verifies composition with priority filter
- TestParentIDWithBlockedDescendants: Verifies blocked issues excluded from ready
- TestParentIDEmptyParent: Verifies empty result for childless parent
* fix: Correct blockedCmd indentation and suppress gosec false positive
- Fix syntax error from incorrect indentation in blockedCmd Run function
- Add nolint:gosec comment for GetBlockedIssues SQL formatting (G201)
The filterSQL variable contains only parameterized WHERE clauses with
? placeholders, not user input
503 lines
13 KiB
Go
503 lines
13 KiB
Go
package rpc
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/steveyegge/beads/internal/types"
|
|
"golang.org/x/mod/semver"
|
|
)
|
|
|
|
// checkVersionCompatibility validates client version against server version
|
|
// Returns error if versions are incompatible
|
|
func (s *Server) checkVersionCompatibility(clientVersion string) error {
|
|
// Allow empty client version (old clients before this feature)
|
|
if clientVersion == "" {
|
|
return nil
|
|
}
|
|
|
|
// Normalize versions to semver format (add 'v' prefix if missing)
|
|
serverVer := ServerVersion
|
|
if !strings.HasPrefix(serverVer, "v") {
|
|
serverVer = "v" + serverVer
|
|
}
|
|
clientVer := clientVersion
|
|
if !strings.HasPrefix(clientVer, "v") {
|
|
clientVer = "v" + clientVer
|
|
}
|
|
|
|
// Validate versions are valid semver
|
|
if !semver.IsValid(serverVer) || !semver.IsValid(clientVer) {
|
|
// If either version is invalid, allow connection (dev builds, etc)
|
|
return nil
|
|
}
|
|
|
|
// Extract major versions
|
|
serverMajor := semver.Major(serverVer)
|
|
clientMajor := semver.Major(clientVer)
|
|
|
|
// Major version must match
|
|
if serverMajor != clientMajor {
|
|
cmp := semver.Compare(serverVer, clientVer)
|
|
if cmp < 0 {
|
|
// Daemon is older - needs upgrade
|
|
return fmt.Errorf("incompatible major versions: client %s, daemon %s. Daemon is older; upgrade and restart daemon: 'bd daemon --stop && bd daemon'",
|
|
clientVersion, ServerVersion)
|
|
}
|
|
// Daemon is newer - client needs upgrade
|
|
return fmt.Errorf("incompatible major versions: client %s, daemon %s. Client is older; upgrade the bd CLI to match the daemon's major version",
|
|
clientVersion, ServerVersion)
|
|
}
|
|
|
|
// Compare full versions - daemon must be >= client (bd-ckvw: strict minor version gating)
|
|
// This prevents stale daemons from serving requests with old schema assumptions
|
|
cmp := semver.Compare(serverVer, clientVer)
|
|
if cmp < 0 {
|
|
// Server is older than client - refuse connection
|
|
// Extract minor versions for clearer error message
|
|
serverMinor := semver.MajorMinor(serverVer)
|
|
clientMinor := semver.MajorMinor(clientVer)
|
|
|
|
if serverMinor != clientMinor {
|
|
// Minor version mismatch - schema may be incompatible
|
|
return fmt.Errorf("version mismatch: client v%s requires daemon upgrade (daemon is v%s). The client may expect schema changes not present in this daemon version. Run: bd daemons killall",
|
|
clientVersion, ServerVersion)
|
|
}
|
|
|
|
// Patch version difference - usually safe but warn
|
|
return fmt.Errorf("version mismatch: daemon v%s is older than client v%s. Upgrade and restart daemon: bd daemons killall",
|
|
ServerVersion, clientVersion)
|
|
}
|
|
|
|
// Client is same version or older - OK (daemon supports backward compat within major version)
|
|
return nil
|
|
}
|
|
|
|
// validateDatabaseBinding validates that the client is connecting to the correct daemon
|
|
// Returns error if ExpectedDB is set and doesn't match the daemon's database path
|
|
func (s *Server) validateDatabaseBinding(req *Request) error {
|
|
// If client doesn't specify ExpectedDB, allow but log warning (old clients)
|
|
if req.ExpectedDB == "" {
|
|
// Log warning for audit trail
|
|
fmt.Fprintf(os.Stderr, "Warning: Client request without database binding validation (old client or missing ExpectedDB)\n")
|
|
return nil
|
|
}
|
|
|
|
// Local daemon always uses single storage
|
|
daemonDB := s.storage.Path()
|
|
|
|
// Normalize both paths for comparison (resolve symlinks, clean paths)
|
|
expectedPath, err := filepath.EvalSymlinks(req.ExpectedDB)
|
|
if err != nil {
|
|
// If we can't resolve expected path, use it as-is
|
|
expectedPath = filepath.Clean(req.ExpectedDB)
|
|
}
|
|
daemonPath, err := filepath.EvalSymlinks(daemonDB)
|
|
if err != nil {
|
|
// If we can't resolve daemon path, use it as-is
|
|
daemonPath = filepath.Clean(daemonDB)
|
|
}
|
|
|
|
// Compare paths
|
|
if expectedPath != daemonPath {
|
|
return fmt.Errorf("database mismatch: client expects %s but daemon serves %s. Wrong daemon connection - check socket path",
|
|
req.ExpectedDB, daemonDB)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) handleRequest(req *Request) Response {
|
|
// Track request timing
|
|
start := time.Now()
|
|
|
|
// Defer metrics recording to ensure it always happens
|
|
defer func() {
|
|
latency := time.Since(start)
|
|
s.metrics.RecordRequest(req.Operation, latency)
|
|
}()
|
|
|
|
// Validate database binding (skip for health/metrics to allow diagnostics)
|
|
if req.Operation != OpHealth && req.Operation != OpMetrics {
|
|
if err := s.validateDatabaseBinding(req); err != nil {
|
|
s.metrics.RecordError(req.Operation)
|
|
return Response{
|
|
Success: false,
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check version compatibility (skip for ping/health to allow version checks)
|
|
if req.Operation != OpPing && req.Operation != OpHealth {
|
|
if err := s.checkVersionCompatibility(req.ClientVersion); err != nil {
|
|
s.metrics.RecordError(req.Operation)
|
|
return Response{
|
|
Success: false,
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check for stale JSONL and auto-import if needed (bd-160)
|
|
// Skip for write operations that will trigger export anyway
|
|
// Skip for import operation itself to avoid recursion
|
|
if req.Operation != OpPing && req.Operation != OpHealth && req.Operation != OpMetrics &&
|
|
req.Operation != OpImport && req.Operation != OpExport {
|
|
if err := s.checkAndAutoImportIfStale(req); err != nil {
|
|
// Log warning but continue - don't fail the request
|
|
fmt.Fprintf(os.Stderr, "Warning: staleness check failed: %v\n", err)
|
|
}
|
|
}
|
|
|
|
// Update last activity timestamp
|
|
s.lastActivityTime.Store(time.Now())
|
|
|
|
var resp Response
|
|
switch req.Operation {
|
|
case OpPing:
|
|
resp = s.handlePing(req)
|
|
case OpStatus:
|
|
resp = s.handleStatus(req)
|
|
case OpHealth:
|
|
resp = s.handleHealth(req)
|
|
case OpMetrics:
|
|
resp = s.handleMetrics(req)
|
|
case OpCreate:
|
|
resp = s.handleCreate(req)
|
|
case OpUpdate:
|
|
resp = s.handleUpdate(req)
|
|
case OpClose:
|
|
resp = s.handleClose(req)
|
|
case OpDelete:
|
|
resp = s.handleDelete(req)
|
|
case OpList:
|
|
resp = s.handleList(req)
|
|
case OpCount:
|
|
resp = s.handleCount(req)
|
|
case OpShow:
|
|
resp = s.handleShow(req)
|
|
case OpResolveID:
|
|
resp = s.handleResolveID(req)
|
|
case OpReady:
|
|
resp = s.handleReady(req)
|
|
case OpBlocked:
|
|
resp = s.handleBlocked(req)
|
|
case OpStale:
|
|
resp = s.handleStale(req)
|
|
case OpStats:
|
|
resp = s.handleStats(req)
|
|
case OpDepAdd:
|
|
resp = s.handleDepAdd(req)
|
|
case OpDepRemove:
|
|
resp = s.handleDepRemove(req)
|
|
case OpLabelAdd:
|
|
resp = s.handleLabelAdd(req)
|
|
case OpLabelRemove:
|
|
resp = s.handleLabelRemove(req)
|
|
case OpCommentList:
|
|
resp = s.handleCommentList(req)
|
|
case OpCommentAdd:
|
|
resp = s.handleCommentAdd(req)
|
|
case OpBatch:
|
|
resp = s.handleBatch(req)
|
|
|
|
case OpCompact:
|
|
resp = s.handleCompact(req)
|
|
case OpCompactStats:
|
|
resp = s.handleCompactStats(req)
|
|
case OpExport:
|
|
resp = s.handleExport(req)
|
|
case OpImport:
|
|
resp = s.handleImport(req)
|
|
case OpEpicStatus:
|
|
resp = s.handleEpicStatus(req)
|
|
case OpGetMutations:
|
|
resp = s.handleGetMutations(req)
|
|
case OpGetMoleculeProgress:
|
|
resp = s.handleGetMoleculeProgress(req)
|
|
case OpGetWorkerStatus:
|
|
resp = s.handleGetWorkerStatus(req)
|
|
case OpShutdown:
|
|
resp = s.handleShutdown(req)
|
|
// Gate operations (bd-likt)
|
|
case OpGateCreate:
|
|
resp = s.handleGateCreate(req)
|
|
case OpGateList:
|
|
resp = s.handleGateList(req)
|
|
case OpGateShow:
|
|
resp = s.handleGateShow(req)
|
|
case OpGateClose:
|
|
resp = s.handleGateClose(req)
|
|
case OpGateWait:
|
|
resp = s.handleGateWait(req)
|
|
default:
|
|
s.metrics.RecordError(req.Operation)
|
|
return Response{
|
|
Success: false,
|
|
Error: fmt.Sprintf("unknown operation: %s", req.Operation),
|
|
}
|
|
}
|
|
|
|
// Record error if request failed
|
|
if !resp.Success {
|
|
s.metrics.RecordError(req.Operation)
|
|
}
|
|
|
|
return resp
|
|
}
|
|
|
|
// Adapter helpers
|
|
func (s *Server) reqCtx(_ *Request) context.Context {
|
|
return context.Background()
|
|
}
|
|
|
|
func (s *Server) reqActor(req *Request) string {
|
|
if req != nil && req.Actor != "" {
|
|
return req.Actor
|
|
}
|
|
return "daemon"
|
|
}
|
|
|
|
// Handler implementations
|
|
|
|
func (s *Server) handlePing(_ *Request) Response {
|
|
data, _ := json.Marshal(PingResponse{
|
|
Message: "pong",
|
|
Version: ServerVersion,
|
|
})
|
|
return Response{
|
|
Success: true,
|
|
Data: data,
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleStatus(_ *Request) Response {
|
|
// Get last activity timestamp
|
|
lastActivity := s.lastActivityTime.Load().(time.Time)
|
|
|
|
// Check for exclusive lock
|
|
lockActive := false
|
|
lockHolder := ""
|
|
if s.workspacePath != "" {
|
|
if skip, holder, _ := types.ShouldSkipDatabase(s.workspacePath); skip {
|
|
lockActive = true
|
|
lockHolder = holder
|
|
}
|
|
}
|
|
|
|
// Read config under lock
|
|
s.mu.RLock()
|
|
autoCommit := s.autoCommit
|
|
autoPush := s.autoPush
|
|
autoPull := s.autoPull
|
|
localMode := s.localMode
|
|
syncInterval := s.syncInterval
|
|
daemonMode := s.daemonMode
|
|
s.mu.RUnlock()
|
|
|
|
statusResp := StatusResponse{
|
|
Version: ServerVersion,
|
|
WorkspacePath: s.workspacePath,
|
|
DatabasePath: s.dbPath,
|
|
SocketPath: s.socketPath,
|
|
PID: os.Getpid(),
|
|
UptimeSeconds: time.Since(s.startTime).Seconds(),
|
|
LastActivityTime: lastActivity.Format(time.RFC3339),
|
|
ExclusiveLockActive: lockActive,
|
|
ExclusiveLockHolder: lockHolder,
|
|
AutoCommit: autoCommit,
|
|
AutoPush: autoPush,
|
|
AutoPull: autoPull,
|
|
LocalMode: localMode,
|
|
SyncInterval: syncInterval,
|
|
DaemonMode: daemonMode,
|
|
}
|
|
|
|
data, _ := json.Marshal(statusResp)
|
|
return Response{
|
|
Success: true,
|
|
Data: data,
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleHealth(req *Request) Response {
|
|
start := time.Now()
|
|
|
|
// Get memory stats for health response
|
|
var m runtime.MemStats
|
|
runtime.ReadMemStats(&m)
|
|
|
|
store := s.storage
|
|
|
|
healthCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
defer cancel()
|
|
|
|
status := "healthy"
|
|
dbError := ""
|
|
|
|
_, pingErr := store.GetStatistics(healthCtx)
|
|
dbResponseMs := time.Since(start).Seconds() * 1000
|
|
|
|
if pingErr != nil {
|
|
status = statusUnhealthy
|
|
dbError = pingErr.Error()
|
|
} else if dbResponseMs > 500 {
|
|
status = "degraded"
|
|
}
|
|
|
|
// Check version compatibility
|
|
compatible := true
|
|
if req.ClientVersion != "" {
|
|
if err := s.checkVersionCompatibility(req.ClientVersion); err != nil {
|
|
compatible = false
|
|
}
|
|
}
|
|
|
|
health := HealthResponse{
|
|
Status: status,
|
|
Version: ServerVersion,
|
|
ClientVersion: req.ClientVersion,
|
|
Compatible: compatible,
|
|
Uptime: time.Since(s.startTime).Seconds(),
|
|
DBResponseTime: dbResponseMs,
|
|
ActiveConns: atomic.LoadInt32(&s.activeConns),
|
|
MaxConns: s.maxConns,
|
|
MemoryAllocMB: m.Alloc / 1024 / 1024,
|
|
}
|
|
|
|
if dbError != "" {
|
|
health.Error = dbError
|
|
}
|
|
|
|
data, _ := json.Marshal(health)
|
|
return Response{
|
|
Success: status != "unhealthy",
|
|
Data: data,
|
|
Error: dbError,
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleMetrics(_ *Request) Response {
|
|
snapshot := s.metrics.Snapshot(
|
|
int(atomic.LoadInt32(&s.activeConns)),
|
|
)
|
|
|
|
data, _ := json.Marshal(snapshot)
|
|
return Response{
|
|
Success: true,
|
|
Data: data,
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleGetWorkerStatus(req *Request) Response {
|
|
ctx := s.reqCtx(req)
|
|
|
|
// Parse optional args
|
|
var args GetWorkerStatusArgs
|
|
if len(req.Args) > 0 {
|
|
if err := json.Unmarshal(req.Args, &args); err != nil {
|
|
return Response{
|
|
Success: false,
|
|
Error: fmt.Sprintf("invalid args: %v", err),
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build filter: find all in_progress issues with assignees
|
|
filter := types.IssueFilter{
|
|
Status: func() *types.Status { s := types.StatusInProgress; return &s }(),
|
|
}
|
|
if args.Assignee != "" {
|
|
filter.Assignee = &args.Assignee
|
|
}
|
|
|
|
// Get all in_progress issues (potential workers)
|
|
issues, err := s.storage.SearchIssues(ctx, "", filter)
|
|
if err != nil {
|
|
return Response{
|
|
Success: false,
|
|
Error: fmt.Sprintf("failed to search issues: %v", err),
|
|
}
|
|
}
|
|
|
|
var workers []WorkerStatus
|
|
for _, issue := range issues {
|
|
// Skip issues without assignees
|
|
if issue.Assignee == "" {
|
|
continue
|
|
}
|
|
|
|
worker := WorkerStatus{
|
|
Assignee: issue.Assignee,
|
|
LastActivity: issue.UpdatedAt.Format(time.RFC3339),
|
|
Status: string(issue.Status),
|
|
}
|
|
|
|
// Check if this issue is a child of a molecule/epic (has parent-child dependency)
|
|
deps, err := s.storage.GetDependencyRecords(ctx, issue.ID)
|
|
if err == nil {
|
|
for _, dep := range deps {
|
|
if dep.Type == types.DepParentChild {
|
|
// This issue is a child - get the parent molecule
|
|
parentIssue, err := s.storage.GetIssue(ctx, dep.DependsOnID)
|
|
if err == nil && parentIssue != nil {
|
|
worker.MoleculeID = parentIssue.ID
|
|
worker.MoleculeTitle = parentIssue.Title
|
|
worker.StepID = issue.ID
|
|
worker.StepTitle = issue.Title
|
|
|
|
// Count total steps and determine current step number
|
|
// by getting all children of the molecule
|
|
children, err := s.storage.GetDependents(ctx, parentIssue.ID)
|
|
if err == nil {
|
|
// Filter to only parent-child dependencies
|
|
var steps []*types.Issue
|
|
for _, child := range children {
|
|
childDeps, err := s.storage.GetDependencyRecords(ctx, child.ID)
|
|
if err == nil {
|
|
for _, childDep := range childDeps {
|
|
if childDep.Type == types.DepParentChild && childDep.DependsOnID == parentIssue.ID {
|
|
steps = append(steps, child)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
worker.TotalSteps = len(steps)
|
|
|
|
// Find current step number (1-indexed)
|
|
for i, step := range steps {
|
|
if step.ID == issue.ID {
|
|
worker.CurrentStep = i + 1
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
break // Found the parent, no need to check other deps
|
|
}
|
|
}
|
|
}
|
|
|
|
workers = append(workers, worker)
|
|
}
|
|
|
|
resp := GetWorkerStatusResponse{
|
|
Workers: workers,
|
|
}
|
|
|
|
data, _ := json.Marshal(resp)
|
|
return Response{
|
|
Success: true,
|
|
Data: data,
|
|
}
|
|
}
|