feat: Add bd repos multi-repo commands and fix bd ready for in_progress issues

- Add 'bd repos' command for multi-repository management (bd-123)
  - bd repos list: show all cached repositories
  - bd repos ready: aggregate ready work across repos
  - bd repos stats: combined statistics across repos
  - bd repos clear-cache: clear repository cache
  - Requires global daemon (bd daemon --global)

- Fix bd ready to show in_progress issues (bd-165)
  - bd ready now shows both 'open' and 'in_progress' issues with no blockers
  - Allows epics/tasks ready to close to appear in ready work
  - Critical P0 bug fix for workflow

- Apply code review improvements to repos implementation
  - Use strongly typed RPC responses (remove interface{})
  - Fix clear-cache lock handling (close connections outside lock)
  - Add error collection for per-repo failures
  - Add context timeouts (1-2s) to prevent hangs
  - Add lock strategy comments

- Update documentation (README.md, AGENTS.md)
- Add comprehensive tests for both features

Amp-Thread-ID: https://ampcode.com/threads/T-1de989a1-1890-492c-9847-a34144259e0f
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-10-18 00:37:27 -07:00
parent 56a379dc5a
commit fb9b5864af
10 changed files with 773 additions and 23 deletions

View File

@@ -190,3 +190,23 @@ func (c *Client) RemoveLabel(args *LabelRemoveArgs) (*Response, error) {
func (c *Client) Batch(args *BatchArgs) (*Response, error) {
return c.Execute(OpBatch, args)
}
// ReposList lists all cached repositories
func (c *Client) ReposList() (*Response, error) {
return c.Execute(OpReposList, struct{}{})
}
// ReposReady gets ready work across all repositories
func (c *Client) ReposReady(args *ReposReadyArgs) (*Response, error) {
return c.Execute(OpReposReady, args)
}
// ReposStats gets combined statistics across all repositories
func (c *Client) ReposStats() (*Response, error) {
return c.Execute(OpReposStats, struct{}{})
}
// ReposClearCache clears the repository cache
func (c *Client) ReposClearCache() (*Response, error) {
return c.Execute(OpReposClearCache, struct{}{})
}

View File

@@ -1,23 +1,31 @@
package rpc
import "encoding/json"
import (
"encoding/json"
"github.com/steveyegge/beads/internal/types"
)
// Operation constants for all bd commands
const (
OpPing = "ping"
OpCreate = "create"
OpUpdate = "update"
OpClose = "close"
OpList = "list"
OpShow = "show"
OpReady = "ready"
OpStats = "stats"
OpDepAdd = "dep_add"
OpDepRemove = "dep_remove"
OpDepTree = "dep_tree"
OpLabelAdd = "label_add"
OpLabelRemove = "label_remove"
OpBatch = "batch"
OpPing = "ping"
OpCreate = "create"
OpUpdate = "update"
OpClose = "close"
OpList = "list"
OpShow = "show"
OpReady = "ready"
OpStats = "stats"
OpDepAdd = "dep_add"
OpDepRemove = "dep_remove"
OpDepTree = "dep_tree"
OpLabelAdd = "label_add"
OpLabelRemove = "label_remove"
OpBatch = "batch"
OpReposList = "repos_list"
OpReposReady = "repos_ready"
OpReposStats = "repos_stats"
OpReposClearCache = "repos_clear_cache"
)
// Request represents an RPC request from client to daemon
@@ -151,3 +159,38 @@ type BatchResult struct {
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
// ReposReadyArgs represents arguments for repos ready operation
type ReposReadyArgs struct {
Assignee string `json:"assignee,omitempty"`
Priority *int `json:"priority,omitempty"`
Limit int `json:"limit,omitempty"`
GroupByRepo bool `json:"group_by_repo,omitempty"`
}
// RepoInfo represents information about a cached repository
type RepoInfo struct {
Path string `json:"path"`
Prefix string `json:"prefix"`
IssueCount int `json:"issue_count"`
LastAccess string `json:"last_access"`
}
// RepoReadyWork represents ready work for a single repository
type RepoReadyWork struct {
RepoPath string `json:"repo_path"`
Issues []*types.Issue `json:"issues"`
}
// ReposReadyIssue represents an issue with repo context
type ReposReadyIssue struct {
RepoPath string `json:"repo_path"`
Issue *types.Issue `json:"issue"`
}
// ReposStatsResponse contains combined statistics across repos
type ReposStatsResponse struct {
Total types.Statistics `json:"total"`
PerRepo map[string]types.Statistics `json:"per_repo"`
Errors map[string]string `json:"errors,omitempty"`
}

View File

@@ -11,6 +11,7 @@ import (
"path/filepath"
"sync"
"syscall"
"time"
"github.com/steveyegge/beads/internal/storage"
"github.com/steveyegge/beads/internal/storage/sqlite"
@@ -176,6 +177,14 @@ func (s *Server) handleRequest(req *Request) Response {
return s.handleLabelRemove(req)
case OpBatch:
return s.handleBatch(req)
case OpReposList:
return s.handleReposList(req)
case OpReposReady:
return s.handleReposReady(req)
case OpReposStats:
return s.handleReposStats(req)
case OpReposClearCache:
return s.handleReposClearCache(req)
default:
return Response{
Success: false,
@@ -766,3 +775,203 @@ func (s *Server) writeResponse(writer *bufio.Writer, resp Response) {
writer.WriteByte('\n')
writer.Flush()
}
// Multi-repo handlers
func (s *Server) handleReposList(_ *Request) Response {
// Keep read lock during iteration to prevent stores from being closed mid-query
s.cacheMu.RLock()
defer s.cacheMu.RUnlock()
repos := make([]RepoInfo, 0, len(s.storageCache))
for path, store := range s.storageCache {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
stats, err := store.GetStatistics(ctx)
cancel()
if err != nil {
continue
}
// Extract prefix from a sample issue
filter := types.IssueFilter{Limit: 1}
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
issues, err := store.SearchIssues(ctx2, "", filter)
cancel2()
prefix := ""
if err == nil && len(issues) > 0 && len(issues[0].ID) > 0 {
// Extract prefix (everything before the last hyphen and number)
id := issues[0].ID
for i := len(id) - 1; i >= 0; i-- {
if id[i] == '-' {
prefix = id[:i+1]
break
}
}
}
repos = append(repos, RepoInfo{
Path: path,
Prefix: prefix,
IssueCount: stats.TotalIssues,
LastAccess: "active",
})
}
data, _ := json.Marshal(repos)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleReposReady(req *Request) Response {
var args ReposReadyArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid args: %v", err),
}
}
// Keep read lock during iteration to prevent stores from being closed mid-query
s.cacheMu.RLock()
defer s.cacheMu.RUnlock()
if args.GroupByRepo {
result := make([]RepoReadyWork, 0, len(s.storageCache))
for path, store := range s.storageCache {
filter := types.WorkFilter{
Status: types.StatusOpen,
Limit: args.Limit,
}
if args.Priority != nil {
filter.Priority = args.Priority
}
if args.Assignee != "" {
filter.Assignee = &args.Assignee
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
issues, err := store.GetReadyWork(ctx, filter)
cancel()
if err != nil || len(issues) == 0 {
continue
}
result = append(result, RepoReadyWork{
RepoPath: path,
Issues: issues,
})
}
data, _ := json.Marshal(result)
return Response{
Success: true,
Data: data,
}
}
// Flat list of all ready issues across all repos
allIssues := make([]ReposReadyIssue, 0)
for path, store := range s.storageCache {
filter := types.WorkFilter{
Status: types.StatusOpen,
Limit: args.Limit,
}
if args.Priority != nil {
filter.Priority = args.Priority
}
if args.Assignee != "" {
filter.Assignee = &args.Assignee
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
issues, err := store.GetReadyWork(ctx, filter)
cancel()
if err != nil {
continue
}
for _, issue := range issues {
allIssues = append(allIssues, ReposReadyIssue{
RepoPath: path,
Issue: issue,
})
}
}
data, _ := json.Marshal(allIssues)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleReposStats(_ *Request) Response {
// Keep read lock during iteration to prevent stores from being closed mid-query
s.cacheMu.RLock()
defer s.cacheMu.RUnlock()
total := types.Statistics{}
perRepo := make(map[string]types.Statistics)
errors := make(map[string]string)
for path, store := range s.storageCache {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
stats, err := store.GetStatistics(ctx)
cancel()
if err != nil {
errors[path] = err.Error()
continue
}
perRepo[path] = *stats
// Aggregate totals
total.TotalIssues += stats.TotalIssues
total.OpenIssues += stats.OpenIssues
total.InProgressIssues += stats.InProgressIssues
total.ClosedIssues += stats.ClosedIssues
total.BlockedIssues += stats.BlockedIssues
total.ReadyIssues += stats.ReadyIssues
total.EpicsEligibleForClosure += stats.EpicsEligibleForClosure
}
result := ReposStatsResponse{
Total: total,
PerRepo: perRepo,
}
if len(errors) > 0 {
result.Errors = errors
}
data, _ := json.Marshal(result)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleReposClearCache(_ *Request) Response {
// Copy stores under write lock, clear cache, then close outside lock
// to avoid holding lock during potentially slow Close() operations
s.cacheMu.Lock()
stores := make([]storage.Storage, 0, len(s.storageCache))
for _, store := range s.storageCache {
stores = append(stores, store)
}
s.storageCache = make(map[string]storage.Storage)
s.cacheMu.Unlock()
// Close all storage connections without holding lock
for _, store := range stores {
if err := store.Close(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to close storage: %v\n", err)
}
}
return Response{
Success: true,
Data: json.RawMessage(`{"message":"Cache cleared successfully"}`),
}
}

View File

@@ -10,18 +10,20 @@ import (
)
// GetReadyWork returns issues with no open blockers
// By default, shows both 'open' and 'in_progress' issues so epics/tasks
// ready to close are visible (bd-165)
func (s *SQLiteStorage) GetReadyWork(ctx context.Context, filter types.WorkFilter) ([]*types.Issue, error) {
whereClauses := []string{}
args := []interface{}{}
// Default to open status if not specified
// Default to open OR in_progress if not specified (bd-165)
if filter.Status == "" {
filter.Status = types.StatusOpen
whereClauses = append(whereClauses, "i.status IN ('open', 'in_progress')")
} else {
whereClauses = append(whereClauses, "i.status = ?")
args = append(args, filter.Status)
}
whereClauses = append(whereClauses, "i.status = ?")
args = append(args, filter.Status)
if filter.Priority != nil {
whereClauses = append(whereClauses, "i.priority = ?")
args = append(args, *filter.Priority)

View File

@@ -751,3 +751,74 @@ func TestDeepHierarchyBlocking(t *testing.T) {
}
}
}
func TestGetReadyWorkIncludesInProgress(t *testing.T) {
store, cleanup := setupTestDB(t)
defer cleanup()
ctx := context.Background()
// Create issues:
// bd-1: open, no dependencies → READY
// bd-2: in_progress, no dependencies → READY (bd-165)
// bd-3: in_progress, depends on open issue → BLOCKED
// bd-4: closed, no dependencies → NOT READY (closed)
issue1 := &types.Issue{Title: "Open Ready", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
issue2 := &types.Issue{Title: "In Progress Ready", Status: types.StatusInProgress, Priority: 2, IssueType: types.TypeEpic}
issue3 := &types.Issue{Title: "In Progress Blocked", Status: types.StatusInProgress, Priority: 1, IssueType: types.TypeTask}
issue4 := &types.Issue{Title: "Blocker", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
issue5 := &types.Issue{Title: "Closed", Status: types.StatusClosed, Priority: 1, IssueType: types.TypeTask}
store.CreateIssue(ctx, issue1, "test-user")
store.CreateIssue(ctx, issue2, "test-user")
store.UpdateIssue(ctx, issue2.ID, map[string]interface{}{"status": types.StatusInProgress}, "test-user")
store.CreateIssue(ctx, issue3, "test-user")
store.UpdateIssue(ctx, issue3.ID, map[string]interface{}{"status": types.StatusInProgress}, "test-user")
store.CreateIssue(ctx, issue4, "test-user")
store.CreateIssue(ctx, issue5, "test-user")
store.CloseIssue(ctx, issue5.ID, "Done", "test-user")
// Add dependency: issue3 blocks on issue4
store.AddDependency(ctx, &types.Dependency{IssueID: issue3.ID, DependsOnID: issue4.ID, Type: types.DepBlocks}, "test-user")
// Get ready work (default filter - no status specified)
ready, err := store.GetReadyWork(ctx, types.WorkFilter{})
if err != nil {
t.Fatalf("GetReadyWork failed: %v", err)
}
// Should have 3 ready issues:
// - issue1 (open, no blockers)
// - issue2 (in_progress, no blockers) ← this is the key test case for bd-165
// - issue4 (open blocker, but itself has no blockers so it's ready to work on)
if len(ready) != 3 {
t.Logf("Ready issues:")
for _, r := range ready {
t.Logf(" - %s: %s (status: %s)", r.ID, r.Title, r.Status)
}
t.Fatalf("Expected 3 ready issues, got %d", len(ready))
}
// Verify ready issues
readyIDs := make(map[string]bool)
for _, issue := range ready {
readyIDs[issue.ID] = true
}
if !readyIDs[issue1.ID] {
t.Errorf("Expected %s (open, no blockers) to be ready", issue1.ID)
}
if !readyIDs[issue2.ID] {
t.Errorf("Expected %s (in_progress, no blockers) to be ready - this is bd-165!", issue2.ID)
}
if !readyIDs[issue4.ID] {
t.Errorf("Expected %s (open blocker, but itself unblocked) to be ready", issue4.ID)
}
if readyIDs[issue3.ID] {
t.Errorf("Expected %s (in_progress, blocked) to NOT be ready", issue3.ID)
}
if readyIDs[issue5.ID] {
t.Errorf("Expected %s (closed) to NOT be ready", issue5.ID)
}
}