feat(rpc): add GetWorkerStatus RPC endpoint (bd-l13p)
New RPC endpoint to get all workers and their current molecule/step in one call. Returns: assignee, moleculeID, moleculeTitle, currentStep, totalSteps, stepTitle, lastActivity, status. Enables activity feed TUI to show worker state without multiple round trips. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -422,6 +422,21 @@ func (c *Client) GateWait(args *GateWaitArgs) (*Response, error) {
|
||||
return c.Execute(OpGateWait, args)
|
||||
}
|
||||
|
||||
// GetWorkerStatus retrieves worker status via the daemon
|
||||
func (c *Client) GetWorkerStatus(args *GetWorkerStatusArgs) (*GetWorkerStatusResponse, error) {
|
||||
resp, err := c.Execute(OpGetWorkerStatus, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result GetWorkerStatusResponse
|
||||
if err := json.Unmarshal(resp.Data, &result); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal worker status response: %w", err)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// cleanupStaleDaemonArtifacts removes stale daemon.pid file when socket is missing and lock is free.
|
||||
// This prevents stale artifacts from accumulating after daemon crashes.
|
||||
// Only removes pid file - lock file is managed by OS (released on process exit).
|
||||
|
||||
@@ -38,6 +38,7 @@ const (
|
||||
OpGetMutations = "get_mutations"
|
||||
OpShutdown = "shutdown"
|
||||
OpDelete = "delete"
|
||||
OpGetWorkerStatus = "get_worker_status"
|
||||
|
||||
// Gate operations (bd-likt)
|
||||
OpGateCreate = "gate_create"
|
||||
@@ -464,3 +465,27 @@ type GateWaitArgs struct {
|
||||
type GateWaitResult struct {
|
||||
AddedCount int `json:"added_count"` // Number of new waiters added
|
||||
}
|
||||
|
||||
// GetWorkerStatusArgs represents arguments for retrieving worker status
|
||||
type GetWorkerStatusArgs struct {
|
||||
// Assignee filters to a specific worker (optional, empty = all workers)
|
||||
Assignee string `json:"assignee,omitempty"`
|
||||
}
|
||||
|
||||
// WorkerStatus represents the status of a single worker and their current work
|
||||
type WorkerStatus struct {
|
||||
Assignee string `json:"assignee"` // Worker identifier
|
||||
MoleculeID string `json:"molecule_id,omitempty"` // Parent molecule/epic ID (if working on a step)
|
||||
MoleculeTitle string `json:"molecule_title,omitempty"` // Parent molecule/epic title
|
||||
CurrentStep int `json:"current_step,omitempty"` // Current step number (1-indexed)
|
||||
TotalSteps int `json:"total_steps,omitempty"` // Total number of steps in molecule
|
||||
StepID string `json:"step_id,omitempty"` // Current step issue ID
|
||||
StepTitle string `json:"step_title,omitempty"` // Current step issue title
|
||||
LastActivity string `json:"last_activity"` // ISO 8601 timestamp of last update
|
||||
Status string `json:"status"` // Current work status (in_progress, blocked, etc.)
|
||||
}
|
||||
|
||||
// GetWorkerStatusResponse is the response for get_worker_status operation
|
||||
type GetWorkerStatusResponse struct {
|
||||
Workers []WorkerStatus `json:"workers"`
|
||||
}
|
||||
|
||||
@@ -219,6 +219,8 @@ func (s *Server) handleRequest(req *Request) Response {
|
||||
resp = s.handleEpicStatus(req)
|
||||
case OpGetMutations:
|
||||
resp = s.handleGetMutations(req)
|
||||
case OpGetWorkerStatus:
|
||||
resp = s.handleGetWorkerStatus(req)
|
||||
case OpShutdown:
|
||||
resp = s.handleShutdown(req)
|
||||
// Gate operations (bd-likt)
|
||||
@@ -390,3 +392,107 @@ func (s *Server) handleMetrics(_ *Request) Response {
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleGetWorkerStatus(req *Request) Response {
|
||||
ctx := s.reqCtx(req)
|
||||
|
||||
// Parse optional args
|
||||
var args GetWorkerStatusArgs
|
||||
if len(req.Args) > 0 {
|
||||
if err := json.Unmarshal(req.Args, &args); err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("invalid args: %v", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build filter: find all in_progress issues with assignees
|
||||
filter := types.IssueFilter{
|
||||
Status: func() *types.Status { s := types.StatusInProgress; return &s }(),
|
||||
}
|
||||
if args.Assignee != "" {
|
||||
filter.Assignee = &args.Assignee
|
||||
}
|
||||
|
||||
// Get all in_progress issues (potential workers)
|
||||
issues, err := s.storage.SearchIssues(ctx, "", filter)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to search issues: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
var workers []WorkerStatus
|
||||
for _, issue := range issues {
|
||||
// Skip issues without assignees
|
||||
if issue.Assignee == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
worker := WorkerStatus{
|
||||
Assignee: issue.Assignee,
|
||||
LastActivity: issue.UpdatedAt.Format(time.RFC3339),
|
||||
Status: string(issue.Status),
|
||||
}
|
||||
|
||||
// Check if this issue is a child of a molecule/epic (has parent-child dependency)
|
||||
deps, err := s.storage.GetDependencyRecords(ctx, issue.ID)
|
||||
if err == nil {
|
||||
for _, dep := range deps {
|
||||
if dep.Type == types.DepParentChild {
|
||||
// This issue is a child - get the parent molecule
|
||||
parentIssue, err := s.storage.GetIssue(ctx, dep.DependsOnID)
|
||||
if err == nil && parentIssue != nil {
|
||||
worker.MoleculeID = parentIssue.ID
|
||||
worker.MoleculeTitle = parentIssue.Title
|
||||
worker.StepID = issue.ID
|
||||
worker.StepTitle = issue.Title
|
||||
|
||||
// Count total steps and determine current step number
|
||||
// by getting all children of the molecule
|
||||
children, err := s.storage.GetDependents(ctx, parentIssue.ID)
|
||||
if err == nil {
|
||||
// Filter to only parent-child dependencies
|
||||
var steps []*types.Issue
|
||||
for _, child := range children {
|
||||
childDeps, err := s.storage.GetDependencyRecords(ctx, child.ID)
|
||||
if err == nil {
|
||||
for _, childDep := range childDeps {
|
||||
if childDep.Type == types.DepParentChild && childDep.DependsOnID == parentIssue.ID {
|
||||
steps = append(steps, child)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
worker.TotalSteps = len(steps)
|
||||
|
||||
// Find current step number (1-indexed)
|
||||
for i, step := range steps {
|
||||
if step.ID == issue.ID {
|
||||
worker.CurrentStep = i + 1
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break // Found the parent, no need to check other deps
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
workers = append(workers, worker)
|
||||
}
|
||||
|
||||
resp := GetWorkerStatusResponse{
|
||||
Workers: workers,
|
||||
}
|
||||
|
||||
data, _ := json.Marshal(resp)
|
||||
return Response{
|
||||
Success: true,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
314
internal/rpc/worker_status_test.go
Normal file
314
internal/rpc/worker_status_test.go
Normal file
@@ -0,0 +1,314 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
func TestGetWorkerStatus_NoWorkers(t *testing.T) {
|
||||
_, client, cleanup := setupTestServer(t)
|
||||
defer cleanup()
|
||||
|
||||
// With no in_progress issues assigned, should return empty list
|
||||
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
|
||||
if err != nil {
|
||||
t.Fatalf("GetWorkerStatus failed: %v", err)
|
||||
}
|
||||
|
||||
if len(result.Workers) != 0 {
|
||||
t.Errorf("expected 0 workers, got %d", len(result.Workers))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkerStatus_SingleWorker(t *testing.T) {
|
||||
server, client, cleanup := setupTestServer(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create an in_progress issue with an assignee
|
||||
issue := &types.Issue{
|
||||
ID: "bd-test1",
|
||||
Title: "Test task",
|
||||
Status: types.StatusInProgress,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker1",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := server.storage.CreateIssue(ctx, issue, "test"); err != nil {
|
||||
t.Fatalf("failed to create issue: %v", err)
|
||||
}
|
||||
|
||||
// Query worker status
|
||||
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
|
||||
if err != nil {
|
||||
t.Fatalf("GetWorkerStatus failed: %v", err)
|
||||
}
|
||||
|
||||
if len(result.Workers) != 1 {
|
||||
t.Fatalf("expected 1 worker, got %d", len(result.Workers))
|
||||
}
|
||||
|
||||
worker := result.Workers[0]
|
||||
if worker.Assignee != "worker1" {
|
||||
t.Errorf("expected assignee 'worker1', got '%s'", worker.Assignee)
|
||||
}
|
||||
if worker.Status != "in_progress" {
|
||||
t.Errorf("expected status 'in_progress', got '%s'", worker.Status)
|
||||
}
|
||||
if worker.LastActivity == "" {
|
||||
t.Error("expected last activity to be set")
|
||||
}
|
||||
// Not part of a molecule, so these should be empty
|
||||
if worker.MoleculeID != "" {
|
||||
t.Errorf("expected empty molecule ID, got '%s'", worker.MoleculeID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkerStatus_WithMolecule(t *testing.T) {
|
||||
server, client, cleanup := setupTestServer(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a molecule (epic)
|
||||
molecule := &types.Issue{
|
||||
ID: "bd-mol1",
|
||||
Title: "Test Molecule",
|
||||
Status: types.StatusOpen,
|
||||
IssueType: types.TypeEpic,
|
||||
Priority: 2,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := server.storage.CreateIssue(ctx, molecule, "test"); err != nil {
|
||||
t.Fatalf("failed to create molecule: %v", err)
|
||||
}
|
||||
|
||||
// Create step 1 (completed)
|
||||
step1 := &types.Issue{
|
||||
ID: "bd-step1",
|
||||
Title: "Step 1: Setup",
|
||||
Status: types.StatusClosed,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker1",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
ClosedAt: func() *time.Time { t := time.Now(); return &t }(),
|
||||
}
|
||||
|
||||
if err := server.storage.CreateIssue(ctx, step1, "test"); err != nil {
|
||||
t.Fatalf("failed to create step1: %v", err)
|
||||
}
|
||||
|
||||
// Create step 2 (current step - in progress)
|
||||
step2 := &types.Issue{
|
||||
ID: "bd-step2",
|
||||
Title: "Step 2: Implementation",
|
||||
Status: types.StatusInProgress,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker1",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := server.storage.CreateIssue(ctx, step2, "test"); err != nil {
|
||||
t.Fatalf("failed to create step2: %v", err)
|
||||
}
|
||||
|
||||
// Create step 3 (pending)
|
||||
step3 := &types.Issue{
|
||||
ID: "bd-step3",
|
||||
Title: "Step 3: Testing",
|
||||
Status: types.StatusOpen,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := server.storage.CreateIssue(ctx, step3, "test"); err != nil {
|
||||
t.Fatalf("failed to create step3: %v", err)
|
||||
}
|
||||
|
||||
// Add parent-child dependencies (steps depend on molecule)
|
||||
for _, stepID := range []string{"bd-step1", "bd-step2", "bd-step3"} {
|
||||
dep := &types.Dependency{
|
||||
IssueID: stepID,
|
||||
DependsOnID: "bd-mol1",
|
||||
Type: types.DepParentChild,
|
||||
CreatedAt: time.Now(),
|
||||
CreatedBy: "test",
|
||||
}
|
||||
if err := server.storage.AddDependency(ctx, dep, "test"); err != nil {
|
||||
t.Fatalf("failed to add dependency for %s: %v", stepID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Query worker status
|
||||
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
|
||||
if err != nil {
|
||||
t.Fatalf("GetWorkerStatus failed: %v", err)
|
||||
}
|
||||
|
||||
if len(result.Workers) != 1 {
|
||||
t.Fatalf("expected 1 worker (only in_progress issues), got %d", len(result.Workers))
|
||||
}
|
||||
|
||||
worker := result.Workers[0]
|
||||
if worker.Assignee != "worker1" {
|
||||
t.Errorf("expected assignee 'worker1', got '%s'", worker.Assignee)
|
||||
}
|
||||
if worker.MoleculeID != "bd-mol1" {
|
||||
t.Errorf("expected molecule ID 'bd-mol1', got '%s'", worker.MoleculeID)
|
||||
}
|
||||
if worker.MoleculeTitle != "Test Molecule" {
|
||||
t.Errorf("expected molecule title 'Test Molecule', got '%s'", worker.MoleculeTitle)
|
||||
}
|
||||
if worker.StepID != "bd-step2" {
|
||||
t.Errorf("expected step ID 'bd-step2', got '%s'", worker.StepID)
|
||||
}
|
||||
if worker.StepTitle != "Step 2: Implementation" {
|
||||
t.Errorf("expected step title 'Step 2: Implementation', got '%s'", worker.StepTitle)
|
||||
}
|
||||
if worker.TotalSteps != 3 {
|
||||
t.Errorf("expected 3 total steps, got %d", worker.TotalSteps)
|
||||
}
|
||||
// Note: CurrentStep ordering depends on how GetDependents orders results
|
||||
// Just verify it's set
|
||||
if worker.CurrentStep < 1 || worker.CurrentStep > 3 {
|
||||
t.Errorf("expected current step between 1 and 3, got %d", worker.CurrentStep)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkerStatus_FilterByAssignee(t *testing.T) {
|
||||
server, client, cleanup := setupTestServer(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create issues for two different workers
|
||||
issue1 := &types.Issue{
|
||||
ID: "bd-test1",
|
||||
Title: "Task for worker1",
|
||||
Status: types.StatusInProgress,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker1",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
issue2 := &types.Issue{
|
||||
ID: "bd-test2",
|
||||
Title: "Task for worker2",
|
||||
Status: types.StatusInProgress,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker2",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := server.storage.CreateIssue(ctx, issue1, "test"); err != nil {
|
||||
t.Fatalf("failed to create issue1: %v", err)
|
||||
}
|
||||
if err := server.storage.CreateIssue(ctx, issue2, "test"); err != nil {
|
||||
t.Fatalf("failed to create issue2: %v", err)
|
||||
}
|
||||
|
||||
// Query all workers
|
||||
allResult, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
|
||||
if err != nil {
|
||||
t.Fatalf("GetWorkerStatus (all) failed: %v", err)
|
||||
}
|
||||
|
||||
if len(allResult.Workers) != 2 {
|
||||
t.Errorf("expected 2 workers, got %d", len(allResult.Workers))
|
||||
}
|
||||
|
||||
// Query specific worker
|
||||
filteredResult, err := client.GetWorkerStatus(&GetWorkerStatusArgs{Assignee: "worker1"})
|
||||
if err != nil {
|
||||
t.Fatalf("GetWorkerStatus (filtered) failed: %v", err)
|
||||
}
|
||||
|
||||
if len(filteredResult.Workers) != 1 {
|
||||
t.Fatalf("expected 1 worker, got %d", len(filteredResult.Workers))
|
||||
}
|
||||
|
||||
if filteredResult.Workers[0].Assignee != "worker1" {
|
||||
t.Errorf("expected assignee 'worker1', got '%s'", filteredResult.Workers[0].Assignee)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetWorkerStatus_OnlyInProgressIssues(t *testing.T) {
|
||||
server, client, cleanup := setupTestServer(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create issues with different statuses
|
||||
openIssue := &types.Issue{
|
||||
ID: "bd-open",
|
||||
Title: "Open task",
|
||||
Status: types.StatusOpen,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker1",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
inProgressIssue := &types.Issue{
|
||||
ID: "bd-inprog",
|
||||
Title: "In progress task",
|
||||
Status: types.StatusInProgress,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker2",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
closedIssue := &types.Issue{
|
||||
ID: "bd-closed",
|
||||
Title: "Closed task",
|
||||
Status: types.StatusClosed,
|
||||
IssueType: types.TypeTask,
|
||||
Priority: 2,
|
||||
Assignee: "worker3",
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
ClosedAt: func() *time.Time { t := time.Now(); return &t }(),
|
||||
}
|
||||
|
||||
for _, issue := range []*types.Issue{openIssue, inProgressIssue, closedIssue} {
|
||||
if err := server.storage.CreateIssue(ctx, issue, "test"); err != nil {
|
||||
t.Fatalf("failed to create issue %s: %v", issue.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Query worker status - should only return in_progress issues
|
||||
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
|
||||
if err != nil {
|
||||
t.Fatalf("GetWorkerStatus failed: %v", err)
|
||||
}
|
||||
|
||||
if len(result.Workers) != 1 {
|
||||
t.Fatalf("expected 1 worker (only in_progress), got %d", len(result.Workers))
|
||||
}
|
||||
|
||||
if result.Workers[0].Assignee != "worker2" {
|
||||
t.Errorf("expected assignee 'worker2', got '%s'", result.Workers[0].Assignee)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user