bd sync: 2025-12-23 23:38:57

This commit is contained in:
Steve Yegge
2025-12-23 23:38:57 -08:00
parent 05e10b6759
commit e67f27c092
80 changed files with 7165 additions and 8490 deletions

View File

@@ -395,6 +395,48 @@ func (c *Client) EpicStatus(args *EpicStatusArgs) (*Response, error) {
return c.Execute(OpEpicStatus, args)
}
// Gate operations (bd-likt)
// GateCreate creates a gate via the daemon
func (c *Client) GateCreate(args *GateCreateArgs) (*Response, error) {
return c.Execute(OpGateCreate, args)
}
// GateList lists gates via the daemon
func (c *Client) GateList(args *GateListArgs) (*Response, error) {
return c.Execute(OpGateList, args)
}
// GateShow shows a gate via the daemon
func (c *Client) GateShow(args *GateShowArgs) (*Response, error) {
return c.Execute(OpGateShow, args)
}
// GateClose closes a gate via the daemon
func (c *Client) GateClose(args *GateCloseArgs) (*Response, error) {
return c.Execute(OpGateClose, args)
}
// GateWait adds waiters to a gate via the daemon
func (c *Client) GateWait(args *GateWaitArgs) (*Response, error) {
return c.Execute(OpGateWait, args)
}
// GetWorkerStatus retrieves worker status via the daemon
func (c *Client) GetWorkerStatus(args *GetWorkerStatusArgs) (*GetWorkerStatusResponse, error) {
resp, err := c.Execute(OpGetWorkerStatus, args)
if err != nil {
return nil, err
}
var result GetWorkerStatusResponse
if err := json.Unmarshal(resp.Data, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal worker status response: %w", err)
}
return &result, nil
}
// cleanupStaleDaemonArtifacts removes stale daemon.pid file when socket is missing and lock is free.
// This prevents stale artifacts from accumulating after daemon crashes.
// Only removes pid file - lock file is managed by OS (released on process exit).

View File

@@ -2,6 +2,7 @@ package rpc
import (
"encoding/json"
"time"
)
// Operation constants for all bd commands
@@ -34,9 +35,18 @@ const (
OpExport = "export"
OpImport = "import"
OpEpicStatus = "epic_status"
OpGetMutations = "get_mutations"
OpShutdown = "shutdown"
OpDelete = "delete"
OpGetMutations = "get_mutations"
OpGetMoleculeProgress = "get_molecule_progress"
OpShutdown = "shutdown"
OpDelete = "delete"
OpGetWorkerStatus = "get_worker_status"
// Gate operations (bd-likt)
OpGateCreate = "gate_create"
OpGateList = "gate_list"
OpGateShow = "gate_show"
OpGateClose = "gate_close"
OpGateWait = "gate_wait"
)
// Request represents an RPC request from client to daemon
@@ -413,3 +423,92 @@ type ImportArgs struct {
type GetMutationsArgs struct {
Since int64 `json:"since"` // Unix timestamp in milliseconds (0 for all recent)
}
// Gate operations (bd-likt)
// GateCreateArgs represents arguments for creating a gate
type GateCreateArgs struct {
Title string `json:"title"`
AwaitType string `json:"await_type"` // gh:run, gh:pr, timer, human, mail
AwaitID string `json:"await_id"` // ID/value for the await type
Timeout time.Duration `json:"timeout"` // Timeout duration
Waiters []string `json:"waiters"` // Mail addresses to notify when gate clears
}
// GateCreateResult represents the result of creating a gate
type GateCreateResult struct {
ID string `json:"id"` // Created gate ID
}
// GateListArgs represents arguments for listing gates
type GateListArgs struct {
All bool `json:"all"` // Include closed gates
}
// GateShowArgs represents arguments for showing a gate
type GateShowArgs struct {
ID string `json:"id"` // Gate ID (partial or full)
}
// GateCloseArgs represents arguments for closing a gate
type GateCloseArgs struct {
ID string `json:"id"` // Gate ID (partial or full)
Reason string `json:"reason,omitempty"` // Close reason
}
// GateWaitArgs represents arguments for adding waiters to a gate
type GateWaitArgs struct {
ID string `json:"id"` // Gate ID (partial or full)
Waiters []string `json:"waiters"` // Additional waiters to add
}
// GateWaitResult represents the result of adding waiters
type GateWaitResult struct {
AddedCount int `json:"added_count"` // Number of new waiters added
}
// GetWorkerStatusArgs represents arguments for retrieving worker status
type GetWorkerStatusArgs struct {
// Assignee filters to a specific worker (optional, empty = all workers)
Assignee string `json:"assignee,omitempty"`
}
// WorkerStatus represents the status of a single worker and their current work
type WorkerStatus struct {
Assignee string `json:"assignee"` // Worker identifier
MoleculeID string `json:"molecule_id,omitempty"` // Parent molecule/epic ID (if working on a step)
MoleculeTitle string `json:"molecule_title,omitempty"` // Parent molecule/epic title
CurrentStep int `json:"current_step,omitempty"` // Current step number (1-indexed)
TotalSteps int `json:"total_steps,omitempty"` // Total number of steps in molecule
StepID string `json:"step_id,omitempty"` // Current step issue ID
StepTitle string `json:"step_title,omitempty"` // Current step issue title
LastActivity string `json:"last_activity"` // ISO 8601 timestamp of last update
Status string `json:"status"` // Current work status (in_progress, blocked, etc.)
}
// GetWorkerStatusResponse is the response for get_worker_status operation
type GetWorkerStatusResponse struct {
Workers []WorkerStatus `json:"workers"`
}
// GetMoleculeProgressArgs represents arguments for the get_molecule_progress operation
type GetMoleculeProgressArgs struct {
MoleculeID string `json:"molecule_id"` // The ID of the molecule (parent issue)
}
// MoleculeStep represents a single step within a molecule
type MoleculeStep struct {
ID string `json:"id"`
Title string `json:"title"`
Status string `json:"status"` // "done", "current", "ready", "blocked"
StartTime *string `json:"start_time"` // ISO 8601 timestamp when step was created
CloseTime *string `json:"close_time"` // ISO 8601 timestamp when step was closed (if done)
}
// MoleculeProgress represents the progress of a molecule (parent issue with steps)
type MoleculeProgress struct {
MoleculeID string `json:"molecule_id"`
Title string `json:"title"`
Assignee string `json:"assignee"`
Steps []MoleculeStep `json:"steps"`
}

View File

@@ -1,6 +1,7 @@
package rpc
import (
"context"
"encoding/json"
"fmt"
"net"
@@ -10,6 +11,7 @@ import (
"time"
"github.com/steveyegge/beads/internal/storage"
"github.com/steveyegge/beads/internal/types"
)
// ServerVersion is the version of this RPC server
@@ -80,6 +82,8 @@ const (
type MutationEvent struct {
Type string // One of the Mutation* constants
IssueID string // e.g., "bd-42"
Title string // Issue title for display context (may be empty for some operations)
Assignee string // Issue assignee for display context (may be empty)
Timestamp time.Time
// Optional metadata for richer events (used by status, bonded, etc.)
OldStatus string `json:"old_status,omitempty"` // Previous status (for status events)
@@ -138,10 +142,13 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d
// emitMutation sends a mutation event to the daemon's event-driven loop.
// Non-blocking: drops event if channel is full (sync will happen eventually).
// Also stores in recent mutations buffer for polling.
func (s *Server) emitMutation(eventType, issueID string) {
// Title and assignee provide context for activity feeds; pass empty strings if unknown.
func (s *Server) emitMutation(eventType, issueID, title, assignee string) {
s.emitRichMutation(MutationEvent{
Type: eventType,
IssueID: issueID,
Type: eventType,
IssueID: issueID,
Title: title,
Assignee: assignee,
})
}
@@ -227,3 +234,120 @@ func (s *Server) handleGetMutations(req *Request) Response {
Data: data,
}
}
// handleGetMoleculeProgress handles the get_molecule_progress RPC operation
// Returns detailed progress for a molecule (parent issue with child steps)
func (s *Server) handleGetMoleculeProgress(req *Request) Response {
var args GetMoleculeProgressArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid arguments: %v", err),
}
}
store := s.storage
if store == nil {
return Response{
Success: false,
Error: "storage not available",
}
}
ctx := s.reqCtx(req)
// Get the molecule (parent issue)
molecule, err := store.GetIssue(ctx, args.MoleculeID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to get molecule: %v", err),
}
}
if molecule == nil {
return Response{
Success: false,
Error: fmt.Sprintf("molecule not found: %s", args.MoleculeID),
}
}
// Get children (issues that have parent-child dependency on this molecule)
var children []*types.IssueWithDependencyMetadata
if sqliteStore, ok := store.(interface {
GetDependentsWithMetadata(ctx context.Context, issueID string) ([]*types.IssueWithDependencyMetadata, error)
}); ok {
allDependents, err := sqliteStore.GetDependentsWithMetadata(ctx, args.MoleculeID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to get molecule children: %v", err),
}
}
// Filter for parent-child relationships only
for _, dep := range allDependents {
if dep.DependencyType == types.DepParentChild {
children = append(children, dep)
}
}
}
// Get blocked issue IDs for status computation
blockedIDs := make(map[string]bool)
if sqliteStore, ok := store.(interface {
GetBlockedIssueIDs(ctx context.Context) ([]string, error)
}); ok {
ids, err := sqliteStore.GetBlockedIssueIDs(ctx)
if err == nil {
for _, id := range ids {
blockedIDs[id] = true
}
}
}
// Build steps from children
steps := make([]MoleculeStep, 0, len(children))
for _, child := range children {
step := MoleculeStep{
ID: child.ID,
Title: child.Title,
}
// Compute step status
switch child.Status {
case types.StatusClosed:
step.Status = "done"
case types.StatusInProgress:
step.Status = "current"
default: // open, blocked, etc.
if blockedIDs[child.ID] {
step.Status = "blocked"
} else {
step.Status = "ready"
}
}
// Set timestamps
startTime := child.CreatedAt.Format(time.RFC3339)
step.StartTime = &startTime
if child.ClosedAt != nil {
closeTime := child.ClosedAt.Format(time.RFC3339)
step.CloseTime = &closeTime
}
steps = append(steps, step)
}
progress := MoleculeProgress{
MoleculeID: molecule.ID,
Title: molecule.Title,
Assignee: molecule.Assignee,
Steps: steps,
}
data, _ := json.Marshal(progress)
return Response{
Success: true,
Data: data,
}
}

View File

@@ -350,7 +350,7 @@ func (s *Server) handleCreate(req *Request) Response {
}
// Emit mutation event for event-driven daemon
s.emitMutation(MutationCreate, issue.ID)
s.emitMutation(MutationCreate, issue.ID, issue.Title, issue.Assignee)
data, _ := json.Marshal(issue)
return Response{
@@ -470,11 +470,13 @@ func (s *Server) handleUpdate(req *Request) Response {
s.emitRichMutation(MutationEvent{
Type: MutationStatus,
IssueID: updateArgs.ID,
Title: issue.Title,
Assignee: issue.Assignee,
OldStatus: string(issue.Status),
NewStatus: *updateArgs.Status,
})
} else {
s.emitMutation(MutationUpdate, updateArgs.ID)
s.emitMutation(MutationUpdate, updateArgs.ID, issue.Title, issue.Assignee)
}
}
@@ -544,6 +546,8 @@ func (s *Server) handleClose(req *Request) Response {
s.emitRichMutation(MutationEvent{
Type: MutationStatus,
IssueID: closeArgs.ID,
Title: issue.Title,
Assignee: issue.Assignee,
OldStatus: oldStatus,
NewStatus: "closed",
})
@@ -640,7 +644,7 @@ func (s *Server) handleDelete(req *Request) Response {
}
// Emit mutation event for event-driven daemon
s.emitMutation(MutationDelete, issueID)
s.emitMutation(MutationDelete, issueID, issue.Title, issue.Assignee)
deletedCount++
}
@@ -1373,3 +1377,341 @@ func (s *Server) handleEpicStatus(req *Request) Response {
Data: data,
}
}
// Gate handlers (bd-likt)
func (s *Server) handleGateCreate(req *Request) Response {
var args GateCreateArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid gate create args: %v", err),
}
}
store := s.storage
if store == nil {
return Response{
Success: false,
Error: "storage not available",
}
}
ctx := s.reqCtx(req)
now := time.Now()
// Create gate issue
gate := &types.Issue{
Title: args.Title,
IssueType: types.TypeGate,
Status: types.StatusOpen,
Priority: 1, // Gates are typically high priority
Assignee: "deacon/",
Wisp: true, // Gates are wisps (ephemeral)
AwaitType: args.AwaitType,
AwaitID: args.AwaitID,
Timeout: args.Timeout,
Waiters: args.Waiters,
CreatedAt: now,
UpdatedAt: now,
}
gate.ContentHash = gate.ComputeContentHash()
if err := store.CreateIssue(ctx, gate, s.reqActor(req)); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to create gate: %v", err),
}
}
// Emit mutation event
s.emitMutation(MutationCreate, gate.ID, gate.Title, gate.Assignee)
data, _ := json.Marshal(GateCreateResult{ID: gate.ID})
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleGateList(req *Request) Response {
var args GateListArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid gate list args: %v", err),
}
}
store := s.storage
if store == nil {
return Response{
Success: false,
Error: "storage not available",
}
}
ctx := s.reqCtx(req)
// Build filter for gates
gateType := types.TypeGate
filter := types.IssueFilter{
IssueType: &gateType,
}
if !args.All {
openStatus := types.StatusOpen
filter.Status = &openStatus
}
gates, err := store.SearchIssues(ctx, "", filter)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to list gates: %v", err),
}
}
data, _ := json.Marshal(gates)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleGateShow(req *Request) Response {
var args GateShowArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid gate show args: %v", err),
}
}
store := s.storage
if store == nil {
return Response{
Success: false,
Error: "storage not available",
}
}
ctx := s.reqCtx(req)
// Resolve partial ID
gateID, err := utils.ResolvePartialID(ctx, store, args.ID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to resolve gate ID: %v", err),
}
}
gate, err := store.GetIssue(ctx, gateID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to get gate: %v", err),
}
}
if gate == nil {
return Response{
Success: false,
Error: fmt.Sprintf("gate %s not found", gateID),
}
}
if gate.IssueType != types.TypeGate {
return Response{
Success: false,
Error: fmt.Sprintf("%s is not a gate (type: %s)", gateID, gate.IssueType),
}
}
data, _ := json.Marshal(gate)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleGateClose(req *Request) Response {
var args GateCloseArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid gate close args: %v", err),
}
}
store := s.storage
if store == nil {
return Response{
Success: false,
Error: "storage not available",
}
}
ctx := s.reqCtx(req)
// Resolve partial ID
gateID, err := utils.ResolvePartialID(ctx, store, args.ID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to resolve gate ID: %v", err),
}
}
// Verify it's a gate
gate, err := store.GetIssue(ctx, gateID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to get gate: %v", err),
}
}
if gate == nil {
return Response{
Success: false,
Error: fmt.Sprintf("gate %s not found", gateID),
}
}
if gate.IssueType != types.TypeGate {
return Response{
Success: false,
Error: fmt.Sprintf("%s is not a gate (type: %s)", gateID, gate.IssueType),
}
}
reason := args.Reason
if reason == "" {
reason = "Gate closed"
}
oldStatus := string(gate.Status)
if err := store.CloseIssue(ctx, gateID, reason, s.reqActor(req)); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to close gate: %v", err),
}
}
// Emit rich status change event
s.emitRichMutation(MutationEvent{
Type: MutationStatus,
IssueID: gateID,
OldStatus: oldStatus,
NewStatus: "closed",
})
closedGate, _ := store.GetIssue(ctx, gateID)
data, _ := json.Marshal(closedGate)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleGateWait(req *Request) Response {
var args GateWaitArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid gate wait args: %v", err),
}
}
store := s.storage
if store == nil {
return Response{
Success: false,
Error: "storage not available",
}
}
ctx := s.reqCtx(req)
// Resolve partial ID
gateID, err := utils.ResolvePartialID(ctx, store, args.ID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to resolve gate ID: %v", err),
}
}
// Get existing gate
gate, err := store.GetIssue(ctx, gateID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to get gate: %v", err),
}
}
if gate == nil {
return Response{
Success: false,
Error: fmt.Sprintf("gate %s not found", gateID),
}
}
if gate.IssueType != types.TypeGate {
return Response{
Success: false,
Error: fmt.Sprintf("%s is not a gate (type: %s)", gateID, gate.IssueType),
}
}
if gate.Status == types.StatusClosed {
return Response{
Success: false,
Error: fmt.Sprintf("gate %s is already closed", gateID),
}
}
// Add new waiters (avoiding duplicates)
waiterSet := make(map[string]bool)
for _, w := range gate.Waiters {
waiterSet[w] = true
}
newWaiters := []string{}
for _, addr := range args.Waiters {
if !waiterSet[addr] {
newWaiters = append(newWaiters, addr)
waiterSet[addr] = true
}
}
addedCount := len(newWaiters)
if addedCount > 0 {
// Update waiters using SQLite directly
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
if !ok {
return Response{
Success: false,
Error: "gate wait requires SQLite storage",
}
}
allWaiters := append(gate.Waiters, newWaiters...)
waitersJSON, _ := json.Marshal(allWaiters)
// Use raw SQL to update the waiters field
_, err = sqliteStore.UnderlyingDB().ExecContext(ctx, `UPDATE issues SET waiters = ?, updated_at = ? WHERE id = ?`,
string(waitersJSON), time.Now(), gateID)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to add waiters: %v", err),
}
}
// Emit mutation event
s.emitMutation(MutationUpdate, gateID, gate.Title, gate.Assignee)
}
data, _ := json.Marshal(GateWaitResult{AddedCount: addedCount})
return Response{
Success: true,
Data: data,
}
}

View File

@@ -41,7 +41,8 @@ func (s *Server) handleDepAdd(req *Request) Response {
}
// Emit mutation event for event-driven daemon
s.emitMutation(MutationUpdate, depArgs.FromID)
// Title/assignee empty for dependency operations (would require extra lookup)
s.emitMutation(MutationUpdate, depArgs.FromID, "", "")
return Response{Success: true}
}
@@ -73,7 +74,8 @@ func (s *Server) handleSimpleStoreOp(req *Request, argsPtr interface{}, argDesc
}
// Emit mutation event for event-driven daemon
s.emitMutation(MutationUpdate, issueID)
// Title/assignee empty for simple store operations (would require extra lookup)
s.emitMutation(MutationUpdate, issueID, "", "")
return Response{Success: true}
}
@@ -147,7 +149,8 @@ func (s *Server) handleCommentAdd(req *Request) Response {
}
// Emit mutation event for event-driven daemon
s.emitMutation(MutationComment, commentArgs.ID)
// Title/assignee empty for comment operations (would require extra lookup)
s.emitMutation(MutationComment, commentArgs.ID, "", "")
data, _ := json.Marshal(comment)
return Response{

View File

@@ -13,7 +13,7 @@ func TestEmitMutation(t *testing.T) {
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Emit a mutation
server.emitMutation(MutationCreate, "bd-123")
server.emitMutation(MutationCreate, "bd-123", "Test Issue", "")
// Check that mutation was stored in buffer
mutations := server.GetRecentMutations(0)
@@ -45,14 +45,14 @@ func TestGetRecentMutations_TimestampFiltering(t *testing.T) {
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Emit mutations with delays
server.emitMutation(MutationCreate, "bd-1")
server.emitMutation(MutationCreate, "bd-1", "Issue 1", "")
time.Sleep(10 * time.Millisecond)
checkpoint := time.Now().UnixMilli()
time.Sleep(10 * time.Millisecond)
server.emitMutation(MutationUpdate, "bd-2")
server.emitMutation(MutationUpdate, "bd-3")
server.emitMutation(MutationUpdate, "bd-2", "Issue 2", "")
server.emitMutation(MutationUpdate, "bd-3", "Issue 3", "")
// Get mutations after checkpoint
mutations := server.GetRecentMutations(checkpoint)
@@ -82,7 +82,7 @@ func TestGetRecentMutations_CircularBuffer(t *testing.T) {
// Emit more than maxMutationBuffer (100) mutations
for i := 0; i < 150; i++ {
server.emitMutation(MutationCreate, "bd-"+string(rune(i)))
server.emitMutation(MutationCreate, "bd-"+string(rune(i)), "", "")
time.Sleep(time.Millisecond) // Ensure different timestamps
}
@@ -110,7 +110,7 @@ func TestGetRecentMutations_ConcurrentAccess(t *testing.T) {
// Writer goroutine
go func() {
for i := 0; i < 50; i++ {
server.emitMutation(MutationUpdate, "bd-write")
server.emitMutation(MutationUpdate, "bd-write", "", "")
time.Sleep(time.Millisecond)
}
done <- true
@@ -141,11 +141,11 @@ func TestHandleGetMutations(t *testing.T) {
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Emit some mutations
server.emitMutation(MutationCreate, "bd-1")
server.emitMutation(MutationCreate, "bd-1", "Issue 1", "")
time.Sleep(10 * time.Millisecond)
checkpoint := time.Now().UnixMilli()
time.Sleep(10 * time.Millisecond)
server.emitMutation(MutationUpdate, "bd-2")
server.emitMutation(MutationUpdate, "bd-2", "Issue 2", "")
// Create RPC request
args := GetMutationsArgs{Since: checkpoint}
@@ -213,7 +213,7 @@ func TestMutationEventTypes(t *testing.T) {
}
for _, mutationType := range types {
server.emitMutation(mutationType, "bd-test")
server.emitMutation(mutationType, "bd-test", "", "")
}
mutations := server.GetRecentMutations(0)
@@ -305,7 +305,7 @@ func TestMutationTimestamps(t *testing.T) {
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
before := time.Now()
server.emitMutation(MutationCreate, "bd-123")
server.emitMutation(MutationCreate, "bd-123", "Test Issue", "")
after := time.Now()
mutations := server.GetRecentMutations(0)
@@ -327,7 +327,7 @@ func TestEmitMutation_NonBlocking(t *testing.T) {
// Fill the buffer (default size is 512 from BEADS_MUTATION_BUFFER or default)
for i := 0; i < 600; i++ {
// This should not block even when channel is full
server.emitMutation(MutationCreate, "bd-test")
server.emitMutation(MutationCreate, "bd-test", "", "")
}
// Verify mutations were still stored in recent buffer

View File

@@ -219,8 +219,23 @@ func (s *Server) handleRequest(req *Request) Response {
resp = s.handleEpicStatus(req)
case OpGetMutations:
resp = s.handleGetMutations(req)
case OpGetMoleculeProgress:
resp = s.handleGetMoleculeProgress(req)
case OpGetWorkerStatus:
resp = s.handleGetWorkerStatus(req)
case OpShutdown:
resp = s.handleShutdown(req)
// Gate operations (bd-likt)
case OpGateCreate:
resp = s.handleGateCreate(req)
case OpGateList:
resp = s.handleGateList(req)
case OpGateShow:
resp = s.handleGateShow(req)
case OpGateClose:
resp = s.handleGateClose(req)
case OpGateWait:
resp = s.handleGateWait(req)
default:
s.metrics.RecordError(req.Operation)
return Response{
@@ -379,3 +394,107 @@ func (s *Server) handleMetrics(_ *Request) Response {
Data: data,
}
}
func (s *Server) handleGetWorkerStatus(req *Request) Response {
ctx := s.reqCtx(req)
// Parse optional args
var args GetWorkerStatusArgs
if len(req.Args) > 0 {
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid args: %v", err),
}
}
}
// Build filter: find all in_progress issues with assignees
filter := types.IssueFilter{
Status: func() *types.Status { s := types.StatusInProgress; return &s }(),
}
if args.Assignee != "" {
filter.Assignee = &args.Assignee
}
// Get all in_progress issues (potential workers)
issues, err := s.storage.SearchIssues(ctx, "", filter)
if err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("failed to search issues: %v", err),
}
}
var workers []WorkerStatus
for _, issue := range issues {
// Skip issues without assignees
if issue.Assignee == "" {
continue
}
worker := WorkerStatus{
Assignee: issue.Assignee,
LastActivity: issue.UpdatedAt.Format(time.RFC3339),
Status: string(issue.Status),
}
// Check if this issue is a child of a molecule/epic (has parent-child dependency)
deps, err := s.storage.GetDependencyRecords(ctx, issue.ID)
if err == nil {
for _, dep := range deps {
if dep.Type == types.DepParentChild {
// This issue is a child - get the parent molecule
parentIssue, err := s.storage.GetIssue(ctx, dep.DependsOnID)
if err == nil && parentIssue != nil {
worker.MoleculeID = parentIssue.ID
worker.MoleculeTitle = parentIssue.Title
worker.StepID = issue.ID
worker.StepTitle = issue.Title
// Count total steps and determine current step number
// by getting all children of the molecule
children, err := s.storage.GetDependents(ctx, parentIssue.ID)
if err == nil {
// Filter to only parent-child dependencies
var steps []*types.Issue
for _, child := range children {
childDeps, err := s.storage.GetDependencyRecords(ctx, child.ID)
if err == nil {
for _, childDep := range childDeps {
if childDep.Type == types.DepParentChild && childDep.DependsOnID == parentIssue.ID {
steps = append(steps, child)
break
}
}
}
}
worker.TotalSteps = len(steps)
// Find current step number (1-indexed)
for i, step := range steps {
if step.ID == issue.ID {
worker.CurrentStep = i + 1
break
}
}
}
}
break // Found the parent, no need to check other deps
}
}
}
workers = append(workers, worker)
}
resp := GetWorkerStatusResponse{
Workers: workers,
}
data, _ := json.Marshal(resp)
return Response{
Success: true,
Data: data,
}
}

View File

@@ -0,0 +1,314 @@
package rpc
import (
"context"
"testing"
"time"
"github.com/steveyegge/beads/internal/types"
)
func TestGetWorkerStatus_NoWorkers(t *testing.T) {
_, client, cleanup := setupTestServer(t)
defer cleanup()
// With no in_progress issues assigned, should return empty list
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
if err != nil {
t.Fatalf("GetWorkerStatus failed: %v", err)
}
if len(result.Workers) != 0 {
t.Errorf("expected 0 workers, got %d", len(result.Workers))
}
}
func TestGetWorkerStatus_SingleWorker(t *testing.T) {
server, client, cleanup := setupTestServer(t)
defer cleanup()
ctx := context.Background()
// Create an in_progress issue with an assignee
issue := &types.Issue{
ID: "bd-test1",
Title: "Test task",
Status: types.StatusInProgress,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker1",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := server.storage.CreateIssue(ctx, issue, "test"); err != nil {
t.Fatalf("failed to create issue: %v", err)
}
// Query worker status
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
if err != nil {
t.Fatalf("GetWorkerStatus failed: %v", err)
}
if len(result.Workers) != 1 {
t.Fatalf("expected 1 worker, got %d", len(result.Workers))
}
worker := result.Workers[0]
if worker.Assignee != "worker1" {
t.Errorf("expected assignee 'worker1', got '%s'", worker.Assignee)
}
if worker.Status != "in_progress" {
t.Errorf("expected status 'in_progress', got '%s'", worker.Status)
}
if worker.LastActivity == "" {
t.Error("expected last activity to be set")
}
// Not part of a molecule, so these should be empty
if worker.MoleculeID != "" {
t.Errorf("expected empty molecule ID, got '%s'", worker.MoleculeID)
}
}
func TestGetWorkerStatus_WithMolecule(t *testing.T) {
server, client, cleanup := setupTestServer(t)
defer cleanup()
ctx := context.Background()
// Create a molecule (epic)
molecule := &types.Issue{
ID: "bd-mol1",
Title: "Test Molecule",
Status: types.StatusOpen,
IssueType: types.TypeEpic,
Priority: 2,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := server.storage.CreateIssue(ctx, molecule, "test"); err != nil {
t.Fatalf("failed to create molecule: %v", err)
}
// Create step 1 (completed)
step1 := &types.Issue{
ID: "bd-step1",
Title: "Step 1: Setup",
Status: types.StatusClosed,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker1",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
ClosedAt: func() *time.Time { t := time.Now(); return &t }(),
}
if err := server.storage.CreateIssue(ctx, step1, "test"); err != nil {
t.Fatalf("failed to create step1: %v", err)
}
// Create step 2 (current step - in progress)
step2 := &types.Issue{
ID: "bd-step2",
Title: "Step 2: Implementation",
Status: types.StatusInProgress,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker1",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := server.storage.CreateIssue(ctx, step2, "test"); err != nil {
t.Fatalf("failed to create step2: %v", err)
}
// Create step 3 (pending)
step3 := &types.Issue{
ID: "bd-step3",
Title: "Step 3: Testing",
Status: types.StatusOpen,
IssueType: types.TypeTask,
Priority: 2,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := server.storage.CreateIssue(ctx, step3, "test"); err != nil {
t.Fatalf("failed to create step3: %v", err)
}
// Add parent-child dependencies (steps depend on molecule)
for _, stepID := range []string{"bd-step1", "bd-step2", "bd-step3"} {
dep := &types.Dependency{
IssueID: stepID,
DependsOnID: "bd-mol1",
Type: types.DepParentChild,
CreatedAt: time.Now(),
CreatedBy: "test",
}
if err := server.storage.AddDependency(ctx, dep, "test"); err != nil {
t.Fatalf("failed to add dependency for %s: %v", stepID, err)
}
}
// Query worker status
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
if err != nil {
t.Fatalf("GetWorkerStatus failed: %v", err)
}
if len(result.Workers) != 1 {
t.Fatalf("expected 1 worker (only in_progress issues), got %d", len(result.Workers))
}
worker := result.Workers[0]
if worker.Assignee != "worker1" {
t.Errorf("expected assignee 'worker1', got '%s'", worker.Assignee)
}
if worker.MoleculeID != "bd-mol1" {
t.Errorf("expected molecule ID 'bd-mol1', got '%s'", worker.MoleculeID)
}
if worker.MoleculeTitle != "Test Molecule" {
t.Errorf("expected molecule title 'Test Molecule', got '%s'", worker.MoleculeTitle)
}
if worker.StepID != "bd-step2" {
t.Errorf("expected step ID 'bd-step2', got '%s'", worker.StepID)
}
if worker.StepTitle != "Step 2: Implementation" {
t.Errorf("expected step title 'Step 2: Implementation', got '%s'", worker.StepTitle)
}
if worker.TotalSteps != 3 {
t.Errorf("expected 3 total steps, got %d", worker.TotalSteps)
}
// Note: CurrentStep ordering depends on how GetDependents orders results
// Just verify it's set
if worker.CurrentStep < 1 || worker.CurrentStep > 3 {
t.Errorf("expected current step between 1 and 3, got %d", worker.CurrentStep)
}
}
func TestGetWorkerStatus_FilterByAssignee(t *testing.T) {
server, client, cleanup := setupTestServer(t)
defer cleanup()
ctx := context.Background()
// Create issues for two different workers
issue1 := &types.Issue{
ID: "bd-test1",
Title: "Task for worker1",
Status: types.StatusInProgress,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker1",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
issue2 := &types.Issue{
ID: "bd-test2",
Title: "Task for worker2",
Status: types.StatusInProgress,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker2",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
if err := server.storage.CreateIssue(ctx, issue1, "test"); err != nil {
t.Fatalf("failed to create issue1: %v", err)
}
if err := server.storage.CreateIssue(ctx, issue2, "test"); err != nil {
t.Fatalf("failed to create issue2: %v", err)
}
// Query all workers
allResult, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
if err != nil {
t.Fatalf("GetWorkerStatus (all) failed: %v", err)
}
if len(allResult.Workers) != 2 {
t.Errorf("expected 2 workers, got %d", len(allResult.Workers))
}
// Query specific worker
filteredResult, err := client.GetWorkerStatus(&GetWorkerStatusArgs{Assignee: "worker1"})
if err != nil {
t.Fatalf("GetWorkerStatus (filtered) failed: %v", err)
}
if len(filteredResult.Workers) != 1 {
t.Fatalf("expected 1 worker, got %d", len(filteredResult.Workers))
}
if filteredResult.Workers[0].Assignee != "worker1" {
t.Errorf("expected assignee 'worker1', got '%s'", filteredResult.Workers[0].Assignee)
}
}
func TestGetWorkerStatus_OnlyInProgressIssues(t *testing.T) {
server, client, cleanup := setupTestServer(t)
defer cleanup()
ctx := context.Background()
// Create issues with different statuses
openIssue := &types.Issue{
ID: "bd-open",
Title: "Open task",
Status: types.StatusOpen,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker1",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
inProgressIssue := &types.Issue{
ID: "bd-inprog",
Title: "In progress task",
Status: types.StatusInProgress,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker2",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
closedIssue := &types.Issue{
ID: "bd-closed",
Title: "Closed task",
Status: types.StatusClosed,
IssueType: types.TypeTask,
Priority: 2,
Assignee: "worker3",
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
ClosedAt: func() *time.Time { t := time.Now(); return &t }(),
}
for _, issue := range []*types.Issue{openIssue, inProgressIssue, closedIssue} {
if err := server.storage.CreateIssue(ctx, issue, "test"); err != nil {
t.Fatalf("failed to create issue %s: %v", issue.ID, err)
}
}
// Query worker status - should only return in_progress issues
result, err := client.GetWorkerStatus(&GetWorkerStatusArgs{})
if err != nil {
t.Fatalf("GetWorkerStatus failed: %v", err)
}
if len(result.Workers) != 1 {
t.Fatalf("expected 1 worker (only in_progress), got %d", len(result.Workers))
}
if result.Workers[0].Assignee != "worker2" {
t.Errorf("expected assignee 'worker2', got '%s'", result.Workers[0].Assignee)
}
}