diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 2b390167..2dddcb62 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -49,7 +49,7 @@ {"id":"bd-142","title":"Add 'bd stale' command to show orphaned claims and dead executors","description":"Need visibility into orphaned claims - issues stuck in_progress with execution_state but executor is dead/stopped. Add command to show: 1) All issues with execution_state where executor status=stopped or last_heartbeat \u003e threshold, 2) Executor instance details (when died, how long claimed), 3) Option to auto-release them. Makes manual recovery easier until auto-cleanup (bd-140) is implemented.","design":"Query: SELECT i.*, ei.status, ei.last_heartbeat FROM issues i JOIN issue_execution_state ies ON i.id = ies.issue_id JOIN executor_instances ei ON ies.executor_instance_id = ei.instance_id WHERE ei.status='stopped' OR ei.last_heartbeat \u003c NOW() - threshold. Add --release flag to auto-release all found issues.","acceptance_criteria":"bd stale shows orphaned claims, bd stale --release cleans them up","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-10-18T00:25:16.530937-07:00","updated_at":"2025-10-18T09:57:28.141701-07:00","closed_at":"2025-10-18T02:09:12.529064-07:00"} {"id":"bd-143","title":"Bias ready work towards recent issues before oldest-first","description":"Currently 'bd ready' shows oldest issues first (by created_at). This can bury recently discovered work that might be more relevant. Propose a hybrid approach: show issues from the past 1-2 days first (sorted by priority), then fall back to oldest-first for older issues. This keeps fresh discoveries visible while still surfacing old forgotten work.","status":"closed","priority":2,"issue_type":"feature","created_at":"2025-10-18T09:31:15.036495-07:00","updated_at":"2025-10-18T09:57:28.105887-07:00","closed_at":"2025-10-18T09:35:55.084891-07:00"} {"id":"bd-144","title":"Fix nil pointer dereference in renumber command","description":"The 'bd renumber' command crashes with a nil pointer dereference at renumber.go:52 because store is nil. The command doesn't properly handle daemon/direct mode initialization like other commands do. Error occurs on both --dry-run and --force modes.","status":"closed","priority":1,"issue_type":"bug","created_at":"2025-10-18T09:54:31.59912-07:00","updated_at":"2025-10-18T09:57:28.106373-07:00","closed_at":"2025-10-18T09:56:49.88701-07:00"} -{"id":"bd-145","title":"Add storage cache eviction policy to daemon","description":"Daemon caches DB connections forever in storageCache map (server.go:29). For users with 50+ repos, this causes memory leaks and file descriptor exhaustion.\n\nNeed LRU cache with:\n- Max size limit (default: 50 repos)\n- TTL-based eviction (default: 30min idle)\n- Periodic cleanup goroutine\n\nLocation: internal/rpc/server.go:29-40","design":"Add StorageCacheEntry struct with lastAccess timestamp.\n\nImplement evictStaleStorage() method that runs every 5 minutes to close connections idle \u003e30min.\n\nAdd max cache size enforcement (LRU eviction when full).\n\nMake limits configurable via env vars:\n- BEADS_DAEMON_MAX_CACHE_SIZE (default: 50)\n- BEADS_DAEMON_CACHE_TTL (default: 30m)","acceptance_criteria":"- Cache evicts entries after 30min idle\n- Cache respects max size limit\n- Cleanup goroutine runs periodically\n- Evicted storage connections are properly closed\n- No resource leaks under sustained load\n- Unit tests for eviction logic","status":"in_progress","priority":0,"issue_type":"feature","created_at":"2025-10-18T13:05:46.174245-07:00","updated_at":"2025-10-18T13:13:08.805418-07:00","dependencies":[{"issue_id":"bd-145","depends_on_id":"bd-155","type":"parent-child","created_at":"2025-10-18T13:07:49.077954-07:00","created_by":"daemon"}]} +{"id":"bd-145","title":"Add storage cache eviction policy to daemon","description":"Daemon caches DB connections forever in storageCache map (server.go:29). For users with 50+ repos, this causes memory leaks and file descriptor exhaustion.\n\nNeed LRU cache with:\n- Max size limit (default: 50 repos)\n- TTL-based eviction (default: 30min idle)\n- Periodic cleanup goroutine\n\nLocation: internal/rpc/server.go:29-40","design":"Add StorageCacheEntry struct with lastAccess timestamp.\n\nImplement evictStaleStorage() method that runs every 5 minutes to close connections idle \u003e30min.\n\nAdd max cache size enforcement (LRU eviction when full).\n\nMake limits configurable via env vars:\n- BEADS_DAEMON_MAX_CACHE_SIZE (default: 50)\n- BEADS_DAEMON_CACHE_TTL (default: 30m)","acceptance_criteria":"- Cache evicts entries after 30min idle\n- Cache respects max size limit\n- Cleanup goroutine runs periodically\n- Evicted storage connections are properly closed\n- No resource leaks under sustained load\n- Unit tests for eviction logic","status":"closed","priority":0,"issue_type":"feature","created_at":"2025-10-18T13:05:46.174245-07:00","updated_at":"2025-10-18T13:16:56.921023-07:00","closed_at":"2025-10-18T13:16:56.921023-07:00","dependencies":[{"issue_id":"bd-145","depends_on_id":"bd-155","type":"parent-child","created_at":"2025-10-18T13:07:49.077954-07:00","created_by":"daemon"}]} {"id":"bd-146","title":"Add daemon health check endpoint and probes","description":"Auto-start only checks socket existence, not daemon responsiveness. Daemon can be running but unresponsive (deadlock, hung DB). Users work in degraded direct mode without knowing why.\n\nNeed health check RPC operation that:\n- Tests DB connectivity (1s timeout)\n- Returns uptime, status, metrics\n- Used by auto-start before connecting\n- Enables monitoring/alerting\n\nLocation: internal/rpc/server.go, cmd/bd/main.go:100-108","design":"Add OpHealth RPC operation to protocol.\n\nhandleHealth() implementation:\n- Quick DB ping with 1s timeout\n- Return status, uptime, version\n- Include basic metrics (connections, cache size)\n\nUpdate TryConnect() to call Health() after socket connection:\n- If health check fails, close connection and return nil\n- Enables transparent failover to direct mode\n\nAdd 'bd daemon --health' CLI command for monitoring.","acceptance_criteria":"- Health check RPC endpoint works\n- Returns structured health status\n- Client uses health check before operations\n- bd daemon --health command exists\n- Unhealthy daemon triggers auto-restart or fallback\n- Health check completes in \u003c2 seconds","status":"open","priority":0,"issue_type":"feature","created_at":"2025-10-18T13:05:58.647592-07:00","updated_at":"2025-10-18T13:05:58.647592-07:00","dependencies":[{"issue_id":"bd-146","depends_on_id":"bd-155","type":"parent-child","created_at":"2025-10-18T13:07:49.093618-07:00","created_by":"daemon"}]} {"id":"bd-147","title":"Add stale socket and crash recovery for daemon","description":"When daemon crashes (panic, OOM, signal), socket file remains and blocks new daemon start. Users must manually remove .beads/bd.sock.\n\nProblems:\n- Socket file remains after crash\n- PID file remains (isDaemonRunning false positive)\n- No automatic recovery\n- Users get 'daemon already running' error\n\nLocation: cmd/bd/daemon.go, cmd/bd/main.go:221-311","design":"Improve stale detection in tryAutoStartDaemon():\n\n1. If socket exists, try to connect\n2. If connection fails → stale socket, remove it\n3. Also remove PID file and lock files\n4. Retry daemon start\n\nAdd self-healing to daemon startup:\n- On startup, check for stale PID files\n- If PID in file doesn't exist, remove and continue\n- Use exclusive file lock to prevent races\n\nOptional: Add crash recovery watchdog that restarts daemon on exit.","acceptance_criteria":"- Stale sockets are automatically detected and removed\n- Auto-start recovers from daemon crashes\n- No manual intervention needed for crash recovery\n- PID file management is robust\n- Lock files prevent multiple daemon instances\n- Tests for crash recovery scenarios","status":"open","priority":0,"issue_type":"bug","created_at":"2025-10-18T13:06:10.116917-07:00","updated_at":"2025-10-18T13:06:10.116917-07:00","dependencies":[{"issue_id":"bd-147","depends_on_id":"bd-155","type":"parent-child","created_at":"2025-10-18T13:07:49.108099-07:00","created_by":"daemon"}]} {"id":"bd-148","title":"Add lifecycle management for beads-mcp processes","description":"MCP server processes accumulate without cleanup. Each tool invocation spawns a new Python process that lingers after Claude disconnects.\n\nObserved: 6+ beads-mcp processes running simultaneously.\n\nProblems:\n- No parent-child relationship tracking\n- No cleanup on MCP client disconnect\n- Processes leak over days of use\n- Could accumulate hundreds of processes\n\nLocation: integrations/beads-mcp/src/beads_mcp/server.py","design":"Add proper cleanup handlers to MCP server:\n\n1. Register atexit handler to close daemon connections\n2. Handle SIGTERM/SIGINT for graceful shutdown\n3. Close daemon client in cleanup()\n4. Remove any temp files\n\nOptional improvements:\n- Track active connections to daemon\n- Implement connection pooling\n- Add process timeout/TTL\n- Log lifecycle events for debugging\n\nExample:\nimport atexit\nimport signal\n\ndef cleanup():\n # Close daemon connections\n # Remove temp files\n pass\n\natexit.register(cleanup)\nsignal.signal(signal.SIGTERM, lambda s, f: cleanup())","acceptance_criteria":"- MCP processes clean up on exit\n- Daemon connections are properly closed\n- No process leaks after repeated use\n- Signal handlers work correctly\n- Cleanup runs on normal and abnormal exit\n- Test with multiple concurrent MCP invocations","status":"open","priority":0,"issue_type":"bug","created_at":"2025-10-18T13:06:22.030027-07:00","updated_at":"2025-10-18T13:06:22.030027-07:00","dependencies":[{"issue_id":"bd-148","depends_on_id":"bd-155","type":"parent-child","created_at":"2025-10-18T13:07:49.121494-07:00","created_by":"daemon"}]} diff --git a/internal/rpc/server.go b/internal/rpc/server.go index eec2ef50..9b18712f 100644 --- a/internal/rpc/server.go +++ b/internal/rpc/server.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "path/filepath" + "sort" "sync" "syscall" "time" @@ -32,8 +33,9 @@ type Server struct { mu sync.RWMutex shutdown bool shutdownChan chan struct{} + stopOnce sync.Once // Per-request storage routing with eviction support - storageCache map[string]*StorageCacheEntry // path -> entry + storageCache map[string]*StorageCacheEntry // repoRoot -> entry cacheMu sync.RWMutex maxCacheSize int cacheTTL time.Duration @@ -54,7 +56,7 @@ func NewServer(socketPath string, store storage.Storage) *Server { cacheTTL := 30 * time.Minute // default if env := os.Getenv("BEADS_DAEMON_CACHE_TTL"); env != "" { - if ttl, err := time.ParseDuration(env); err == nil { + if ttl, err := time.ParseDuration(env); err == nil && ttl > 0 { cacheTTL = ttl } } @@ -112,38 +114,43 @@ func (s *Server) Start(ctx context.Context) error { // Stop stops the RPC server and cleans up resources func (s *Server) Stop() error { - s.mu.Lock() - s.shutdown = true - s.mu.Unlock() + var err error + s.stopOnce.Do(func() { + s.mu.Lock() + s.shutdown = true + s.mu.Unlock() - // Signal cleanup goroutine to stop - close(s.shutdownChan) - - if s.cleanupTicker != nil { - s.cleanupTicker.Stop() - } + // Signal cleanup goroutine to stop + close(s.shutdownChan) - // Close all cached storage connections - s.cacheMu.Lock() - for _, entry := range s.storageCache { - if err := entry.store.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to close storage: %v\n", err) + // Close all cached storage connections outside lock + s.cacheMu.Lock() + stores := make([]storage.Storage, 0, len(s.storageCache)) + for _, entry := range s.storageCache { + stores = append(stores, entry.store) } - } - s.storageCache = make(map[string]*StorageCacheEntry) - s.cacheMu.Unlock() + s.storageCache = make(map[string]*StorageCacheEntry) + s.cacheMu.Unlock() - if s.listener != nil { - if err := s.listener.Close(); err != nil { - return fmt.Errorf("failed to close listener: %w", err) + // Close stores without holding lock + for _, store := range stores { + if closeErr := store.Close(); closeErr != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to close storage: %v\n", closeErr) + } } - } - if err := s.removeOldSocket(); err != nil { - return fmt.Errorf("failed to remove socket: %w", err) - } + if s.listener != nil { + if closeErr := s.listener.Close(); closeErr != nil { + err = fmt.Errorf("failed to close listener: %w", closeErr) + return + } + } - return nil + if removeErr := s.removeOldSocket(); removeErr != nil { + err = fmt.Errorf("failed to remove socket: %w", removeErr) + } + }) + return err } func (s *Server) ensureSocketDir() error { @@ -191,7 +198,6 @@ func (s *Server) evictStaleStorage() { toClose := []storage.Storage{} s.cacheMu.Lock() - defer s.cacheMu.Unlock() // First pass: evict TTL-expired entries for path, entry := range s.storageCache { @@ -205,22 +211,18 @@ func (s *Server) evictStaleStorage() { if len(s.storageCache) > s.maxCacheSize { // Build sorted list of entries by lastAccess type cacheItem struct { - path string - entry *StorageCacheEntry + path string + entry *StorageCacheEntry } items := make([]cacheItem, 0, len(s.storageCache)) for path, entry := range s.storageCache { items = append(items, cacheItem{path, entry}) } - // Sort by lastAccess (oldest first) - for i := 0; i < len(items)-1; i++ { - for j := i + 1; j < len(items); j++ { - if items[i].entry.lastAccess.After(items[j].entry.lastAccess) { - items[i], items[j] = items[j], items[i] - } - } - } + // Sort by lastAccess (oldest first) with sort.Slice + sort.Slice(items, func(i, j int) bool { + return items[i].entry.lastAccess.Before(items[j].entry.lastAccess) + }) // Evict oldest entries until we're under the limit numToEvict := len(s.storageCache) - s.maxCacheSize @@ -230,14 +232,15 @@ func (s *Server) evictStaleStorage() { } } - // Close connections outside of lock to avoid blocking - go func() { - for _, store := range toClose { - if err := store.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err) - } + // Unlock before closing to avoid holding lock during Close + s.cacheMu.Unlock() + + // Close connections synchronously + for _, store := range toClose { + if err := store.Close(); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err) } - }() + } } func (s *Server) handleConnection(conn net.Conn) { @@ -829,22 +832,26 @@ func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) { return s.storage, nil } - // Check cache first - s.cacheMu.Lock() - defer s.cacheMu.Unlock() - - if entry, ok := s.storageCache[req.Cwd]; ok { - // Update last access time - entry.lastAccess = time.Now() - return entry.store, nil - } - - // Find database for this cwd + // Find database for this cwd (to get the canonical repo root) dbPath := s.findDatabaseForCwd(req.Cwd) if dbPath == "" { return nil, fmt.Errorf("no .beads database found for path: %s", req.Cwd) } + // Canonicalize key to repo root (parent of .beads directory) + beadsDir := filepath.Dir(dbPath) + repoRoot := filepath.Dir(beadsDir) + + // Check cache first with write lock (to avoid race on lastAccess update) + s.cacheMu.Lock() + defer s.cacheMu.Unlock() + + if entry, ok := s.storageCache[repoRoot]; ok { + // Update last access time (safe under Lock) + entry.lastAccess = time.Now() + return entry.store, nil + } + // Open storage store, err := sqlite.New(dbPath) if err != nil { @@ -852,11 +859,22 @@ func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) { } // Cache it with current timestamp - s.storageCache[req.Cwd] = &StorageCacheEntry{ + s.storageCache[repoRoot] = &StorageCacheEntry{ store: store, lastAccess: time.Now(), } + // Enforce LRU immediately to prevent FD spikes between cleanup ticks + needEvict := len(s.storageCache) > s.maxCacheSize + s.cacheMu.Unlock() + + if needEvict { + s.evictStaleStorage() + } + + // Re-acquire lock for defer + s.cacheMu.Lock() + return store, nil } diff --git a/internal/rpc/server_eviction_test.go b/internal/rpc/server_eviction_test.go index d30b2743..f664507f 100644 --- a/internal/rpc/server_eviction_test.go +++ b/internal/rpc/server_eviction_test.go @@ -1,6 +1,7 @@ package rpc import ( + "fmt" "os" "path/filepath" "testing" @@ -305,3 +306,220 @@ func TestStorageCacheEviction_CleanupOnStop(t *testing.T) { t.Errorf("expected cache to be cleared on stop, got %d entries", cacheSize) } } + +func TestStorageCacheEviction_CanonicalKey(t *testing.T) { + tmpDir := t.TempDir() + + // Create main DB + mainDB := filepath.Join(tmpDir, "main.db") + mainStore, err := sqlite.New(mainDB) + if err != nil { + t.Fatal(err) + } + defer mainStore.Close() + + // Create server + socketPath := filepath.Join(tmpDir, "test.sock") + server := NewServer(socketPath, mainStore) + defer server.Stop() + + // Create test database + dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") + os.MkdirAll(filepath.Dir(dbPath), 0755) + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatal(err) + } + store.Close() + + // Access from different subdirectories of the same repo + req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")} + _, err = server.getStorageForRequest(req1) + if err != nil { + t.Fatal(err) + } + + req2 := &Request{Cwd: filepath.Join(tmpDir, "repo1", "subdir1")} + _, err = server.getStorageForRequest(req2) + if err != nil { + t.Fatal(err) + } + + req3 := &Request{Cwd: filepath.Join(tmpDir, "repo1", "subdir1", "subdir2")} + _, err = server.getStorageForRequest(req3) + if err != nil { + t.Fatal(err) + } + + // Should only have one cache entry (all pointing to same repo root) + server.cacheMu.RLock() + cacheSize := len(server.storageCache) + server.cacheMu.RUnlock() + if cacheSize != 1 { + t.Errorf("expected 1 cached entry (canonical key), got %d", cacheSize) + } +} + +func TestStorageCacheEviction_ImmediateLRU(t *testing.T) { + tmpDir := t.TempDir() + + // Create main DB + mainDB := filepath.Join(tmpDir, "main.db") + mainStore, err := sqlite.New(mainDB) + if err != nil { + t.Fatal(err) + } + defer mainStore.Close() + + // Create server with max cache size of 2 + socketPath := filepath.Join(tmpDir, "test.sock") + server := NewServer(socketPath, mainStore) + server.maxCacheSize = 2 + server.cacheTTL = 1 * time.Hour // Long TTL + defer server.Stop() + + // Create 3 test databases + for i := 1; i <= 3; i++ { + dbPath := filepath.Join(tmpDir, fmt.Sprintf("repo%d", i), ".beads", "issues.db") + os.MkdirAll(filepath.Dir(dbPath), 0755) + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatal(err) + } + store.Close() + } + + // Access all 3 repos + for i := 1; i <= 3; i++ { + req := &Request{Cwd: filepath.Join(tmpDir, fmt.Sprintf("repo%d", i))} + _, err = server.getStorageForRequest(req) + if err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) // Ensure different timestamps + } + + // Cache should never exceed maxCacheSize (immediate LRU enforcement) + server.cacheMu.RLock() + cacheSize := len(server.storageCache) + server.cacheMu.RUnlock() + if cacheSize > server.maxCacheSize { + t.Errorf("cache size %d exceeds max %d (immediate LRU not enforced)", cacheSize, server.maxCacheSize) + } +} + +func TestStorageCacheEviction_InvalidTTL(t *testing.T) { + tmpDir := t.TempDir() + + // Create main DB + mainDB := filepath.Join(tmpDir, "main.db") + mainStore, err := sqlite.New(mainDB) + if err != nil { + t.Fatal(err) + } + defer mainStore.Close() + + // Set invalid TTL + os.Setenv("BEADS_DAEMON_CACHE_TTL", "-5m") + defer os.Unsetenv("BEADS_DAEMON_CACHE_TTL") + + // Create server + socketPath := filepath.Join(tmpDir, "test.sock") + server := NewServer(socketPath, mainStore) + defer server.Stop() + + // Should fall back to default (30 minutes) + expectedTTL := 30 * time.Minute + if server.cacheTTL != expectedTTL { + t.Errorf("expected TTL to fall back to %v for invalid value, got %v", expectedTTL, server.cacheTTL) + } +} + +func TestStorageCacheEviction_ReopenAfterEviction(t *testing.T) { + tmpDir := t.TempDir() + + // Create main DB + mainDB := filepath.Join(tmpDir, "main.db") + mainStore, err := sqlite.New(mainDB) + if err != nil { + t.Fatal(err) + } + defer mainStore.Close() + + // Create server with short TTL + socketPath := filepath.Join(tmpDir, "test.sock") + server := NewServer(socketPath, mainStore) + server.cacheTTL = 50 * time.Millisecond + defer server.Stop() + + // Create test database + dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") + os.MkdirAll(filepath.Dir(dbPath), 0755) + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatal(err) + } + store.Close() + + // Access repo + req := &Request{Cwd: filepath.Join(tmpDir, "repo1")} + _, err = server.getStorageForRequest(req) + if err != nil { + t.Fatal(err) + } + + // Wait for TTL to expire + time.Sleep(100 * time.Millisecond) + + // Evict + server.evictStaleStorage() + + // Verify evicted + server.cacheMu.RLock() + cacheSize := len(server.storageCache) + server.cacheMu.RUnlock() + if cacheSize != 0 { + t.Fatalf("expected cache to be empty after eviction, got %d", cacheSize) + } + + // Access again - should cleanly re-open + _, err = server.getStorageForRequest(req) + if err != nil { + t.Fatalf("failed to re-open after eviction: %v", err) + } + + // Verify re-cached + server.cacheMu.RLock() + cacheSize = len(server.storageCache) + server.cacheMu.RUnlock() + if cacheSize != 1 { + t.Errorf("expected 1 cached entry after re-open, got %d", cacheSize) + } +} + +func TestStorageCacheEviction_StopIdempotent(t *testing.T) { + tmpDir := t.TempDir() + + // Create main DB + mainDB := filepath.Join(tmpDir, "main.db") + mainStore, err := sqlite.New(mainDB) + if err != nil { + t.Fatal(err) + } + defer mainStore.Close() + + // Create server + socketPath := filepath.Join(tmpDir, "test.sock") + server := NewServer(socketPath, mainStore) + + // Stop multiple times - should not panic + if err := server.Stop(); err != nil { + t.Fatalf("first Stop failed: %v", err) + } + if err := server.Stop(); err != nil { + t.Fatalf("second Stop failed: %v", err) + } + if err := server.Stop(); err != nil { + t.Fatalf("third Stop failed: %v", err) + } +}