package rpc import ( "bufio" "context" "encoding/json" "fmt" "net" "os" "os/signal" "path/filepath" "runtime" "sort" "strings" "sync" "sync/atomic" "time" "github.com/steveyegge/beads/internal/compact" "github.com/steveyegge/beads/internal/storage" "github.com/steveyegge/beads/internal/storage/sqlite" "github.com/steveyegge/beads/internal/types" "golang.org/x/mod/semver" ) // ServerVersion is the version of this RPC server // This should match the bd CLI version for proper compatibility checks // It's set as a var so it can be initialized from main var ServerVersion = "0.9.10" // normalizeLabels trims whitespace, removes empty strings, and deduplicates labels func normalizeLabels(ss []string) []string { seen := make(map[string]struct{}) out := make([]string, 0, len(ss)) for _, s := range ss { s = strings.TrimSpace(s) if s == "" { continue } if _, ok := seen[s]; ok { continue } seen[s] = struct{}{} out = append(out, s) } return out } // StorageCacheEntry holds a cached storage with metadata for eviction type StorageCacheEntry struct { store storage.Storage lastAccess time.Time dbMtime time.Time // DB file modification time for detecting external changes } // Server represents the RPC server that runs in the daemon type Server struct { socketPath string storage storage.Storage // Default storage (for backward compat) listener net.Listener mu sync.RWMutex shutdown bool shutdownChan chan struct{} stopOnce sync.Once // Per-request storage routing with eviction support storageCache map[string]*StorageCacheEntry // repoRoot -> entry cacheMu sync.RWMutex maxCacheSize int cacheTTL time.Duration cleanupTicker *time.Ticker // Health and metrics startTime time.Time cacheHits int64 cacheMisses int64 metrics *Metrics // Connection limiting maxConns int activeConns int32 // atomic counter connSemaphore chan struct{} // Request timeout requestTimeout time.Duration // Ready channel signals when server is listening readyChan chan struct{} } // NewServer creates a new RPC server func NewServer(socketPath string, store storage.Storage) *Server { // Parse config from env vars maxCacheSize := 50 // default if env := os.Getenv("BEADS_DAEMON_MAX_CACHE_SIZE"); env != "" { // Parse as integer var size int if _, err := fmt.Sscanf(env, "%d", &size); err == nil && size > 0 { maxCacheSize = size } } cacheTTL := 30 * time.Minute // default if env := os.Getenv("BEADS_DAEMON_CACHE_TTL"); env != "" { if ttl, err := time.ParseDuration(env); err == nil && ttl > 0 { cacheTTL = ttl } } maxConns := 100 // default if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" { var conns int if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 { maxConns = conns } } requestTimeout := 30 * time.Second // default if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" { if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 { requestTimeout = timeout } } return &Server{ socketPath: socketPath, storage: store, storageCache: make(map[string]*StorageCacheEntry), maxCacheSize: maxCacheSize, cacheTTL: cacheTTL, shutdownChan: make(chan struct{}), startTime: time.Now(), metrics: NewMetrics(), maxConns: maxConns, connSemaphore: make(chan struct{}, maxConns), requestTimeout: requestTimeout, readyChan: make(chan struct{}), } } // Start starts the RPC server and listens for connections func (s *Server) Start(ctx context.Context) error { if err := s.ensureSocketDir(); err != nil { return fmt.Errorf("failed to ensure socket directory: %w", err) } if err := s.removeOldSocket(); err != nil { return fmt.Errorf("failed to remove old socket: %w", err) } listener, err := listenRPC(s.socketPath) if err != nil { return fmt.Errorf("failed to initialize RPC listener: %w", err) } s.listener = listener // Set socket permissions to 0600 for security (owner only) if runtime.GOOS != "windows" { if err := os.Chmod(s.socketPath, 0600); err != nil { listener.Close() return fmt.Errorf("failed to set socket permissions: %w", err) } } // Store listener under lock s.mu.Lock() s.listener = listener s.mu.Unlock() // Signal that server is ready to accept connections close(s.readyChan) go s.handleSignals() go s.runCleanupLoop() // Accept connections using listener for { // Get listener under lock s.mu.RLock() listener := s.listener s.mu.RUnlock() conn, err := listener.Accept() if err != nil { s.mu.Lock() shutdown := s.shutdown s.mu.Unlock() if shutdown { return nil } return fmt.Errorf("failed to accept connection: %w", err) } // Try to acquire connection slot (non-blocking) select { case s.connSemaphore <- struct{}{}: // Acquired slot, handle connection s.metrics.RecordConnection() go func(c net.Conn) { defer func() { <-s.connSemaphore }() // Release slot atomic.AddInt32(&s.activeConns, 1) defer atomic.AddInt32(&s.activeConns, -1) s.handleConnection(c) }(conn) default: // Max connections reached, reject immediately s.metrics.RecordRejectedConnection() conn.Close() } } } // WaitReady waits for the server to be ready to accept connections func (s *Server) WaitReady() <-chan struct{} { return s.readyChan } // Stop stops the RPC server and cleans up resources func (s *Server) Stop() error { var err error s.stopOnce.Do(func() { s.mu.Lock() s.shutdown = true s.mu.Unlock() // Signal cleanup goroutine to stop close(s.shutdownChan) // Close all cached storage connections outside lock s.cacheMu.Lock() stores := make([]storage.Storage, 0, len(s.storageCache)) for _, entry := range s.storageCache { stores = append(stores, entry.store) } s.storageCache = make(map[string]*StorageCacheEntry) s.cacheMu.Unlock() // Close stores without holding lock for _, store := range stores { if closeErr := store.Close(); closeErr != nil { fmt.Fprintf(os.Stderr, "Warning: failed to close storage: %v\n", closeErr) } } // Close listener under lock s.mu.Lock() listener := s.listener s.listener = nil s.mu.Unlock() if listener != nil { if closeErr := listener.Close(); closeErr != nil { err = fmt.Errorf("failed to close listener: %w", closeErr) return } } if removeErr := s.removeOldSocket(); removeErr != nil { err = fmt.Errorf("failed to remove socket: %w", removeErr) } }) return err } func (s *Server) ensureSocketDir() error { dir := filepath.Dir(s.socketPath) if err := os.MkdirAll(dir, 0700); err != nil { return err } // Best-effort tighten permissions if directory already existed _ = os.Chmod(dir, 0700) return nil } func (s *Server) removeOldSocket() error { if _, err := os.Stat(s.socketPath); err == nil { // Socket exists - check if it's stale before removing // Try to connect to see if a daemon is actually using it conn, err := dialRPC(s.socketPath, 500*time.Millisecond) if err == nil { // Socket is active - another daemon is running conn.Close() return fmt.Errorf("socket %s is in use by another daemon", s.socketPath) } // Socket is stale - safe to remove if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) { return err } } return nil } func (s *Server) handleSignals() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, serverSignals...) <-sigChan s.Stop() } // runCleanupLoop periodically evicts stale storage connections and checks memory pressure func (s *Server) runCleanupLoop() { s.cleanupTicker = time.NewTicker(5 * time.Minute) defer s.cleanupTicker.Stop() for { select { case <-s.cleanupTicker.C: s.checkMemoryPressure() s.evictStaleStorage() case <-s.shutdownChan: return } } } // checkMemoryPressure monitors memory usage and triggers aggressive eviction if needed func (s *Server) checkMemoryPressure() { var m runtime.MemStats runtime.ReadMemStats(&m) // Memory thresholds (configurable via env var) const defaultThresholdMB = 500 thresholdMB := defaultThresholdMB if env := os.Getenv("BEADS_DAEMON_MEMORY_THRESHOLD_MB"); env != "" { var threshold int if _, err := fmt.Sscanf(env, "%d", &threshold); err == nil && threshold > 0 { thresholdMB = threshold } } allocMB := m.Alloc / 1024 / 1024 if allocMB > uint64(thresholdMB) { fmt.Fprintf(os.Stderr, "Warning: High memory usage detected (%d MB), triggering aggressive cache eviction\n", allocMB) s.aggressiveEviction() runtime.GC() // Suggest garbage collection } } // aggressiveEviction evicts 50% of cached storage to reduce memory pressure func (s *Server) aggressiveEviction() { toClose := []storage.Storage{} s.cacheMu.Lock() if len(s.storageCache) == 0 { s.cacheMu.Unlock() return } // Build sorted list by last access type cacheItem struct { path string entry *StorageCacheEntry } items := make([]cacheItem, 0, len(s.storageCache)) for path, entry := range s.storageCache { items = append(items, cacheItem{path, entry}) } sort.Slice(items, func(i, j int) bool { return items[i].entry.lastAccess.Before(items[j].entry.lastAccess) }) // Evict oldest 50% numToEvict := len(items) / 2 for i := 0; i < numToEvict; i++ { toClose = append(toClose, items[i].entry.store) delete(s.storageCache, items[i].path) } s.cacheMu.Unlock() // Close without holding lock for _, store := range toClose { if err := store.Close(); err != nil { fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err) } } } // evictStaleStorage removes idle connections and enforces cache size limits func (s *Server) evictStaleStorage() { now := time.Now() toClose := []storage.Storage{} s.cacheMu.Lock() // First pass: evict TTL-expired entries for path, entry := range s.storageCache { if now.Sub(entry.lastAccess) > s.cacheTTL { toClose = append(toClose, entry.store) delete(s.storageCache, path) } } // Second pass: enforce max cache size with LRU eviction if len(s.storageCache) > s.maxCacheSize { // Build sorted list of entries by lastAccess type cacheItem struct { path string entry *StorageCacheEntry } items := make([]cacheItem, 0, len(s.storageCache)) for path, entry := range s.storageCache { items = append(items, cacheItem{path, entry}) } // Sort by lastAccess (oldest first) with sort.Slice sort.Slice(items, func(i, j int) bool { return items[i].entry.lastAccess.Before(items[j].entry.lastAccess) }) // Evict oldest entries until we're under the limit numToEvict := len(s.storageCache) - s.maxCacheSize for i := 0; i < numToEvict && i < len(items); i++ { toClose = append(toClose, items[i].entry.store) delete(s.storageCache, items[i].path) s.metrics.RecordCacheEviction() } } // Unlock before closing to avoid holding lock during Close s.cacheMu.Unlock() // Close connections synchronously for _, store := range toClose { if err := store.Close(); err != nil { fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err) } } } func (s *Server) handleConnection(conn net.Conn) { defer conn.Close() reader := bufio.NewReader(conn) writer := bufio.NewWriter(conn) for { // Set read deadline for the next request if err := conn.SetReadDeadline(time.Now().Add(s.requestTimeout)); err != nil { return } line, err := reader.ReadBytes('\n') if err != nil { return } var req Request if err := json.Unmarshal(line, &req); err != nil { resp := Response{ Success: false, Error: fmt.Sprintf("invalid request: %v", err), } s.writeResponse(writer, resp) continue } // Set write deadline for the response if err := conn.SetWriteDeadline(time.Now().Add(s.requestTimeout)); err != nil { return } resp := s.handleRequest(&req) s.writeResponse(writer, resp) } } // checkVersionCompatibility validates client version against server version // Returns error if versions are incompatible func (s *Server) checkVersionCompatibility(clientVersion string) error { // Allow empty client version (old clients before this feature) if clientVersion == "" { return nil } // Normalize versions to semver format (add 'v' prefix if missing) serverVer := ServerVersion if !strings.HasPrefix(serverVer, "v") { serverVer = "v" + serverVer } clientVer := clientVersion if !strings.HasPrefix(clientVer, "v") { clientVer = "v" + clientVer } // Validate versions are valid semver if !semver.IsValid(serverVer) || !semver.IsValid(clientVer) { // If either version is invalid, allow connection (dev builds, etc) return nil } // Extract major versions serverMajor := semver.Major(serverVer) clientMajor := semver.Major(clientVer) // Major version must match if serverMajor != clientMajor { cmp := semver.Compare(serverVer, clientVer) if cmp < 0 { // Daemon is older - needs upgrade return fmt.Errorf("incompatible major versions: client %s, daemon %s. Daemon is older; upgrade and restart daemon: 'bd daemon --stop && bd daemon'", clientVersion, ServerVersion) } // Daemon is newer - client needs upgrade return fmt.Errorf("incompatible major versions: client %s, daemon %s. Client is older; upgrade the bd CLI to match the daemon's major version", clientVersion, ServerVersion) } // Compare full versions - daemon should be >= client for backward compatibility cmp := semver.Compare(serverVer, clientVer) if cmp < 0 { // Server is older than client within same major version - may be missing features return fmt.Errorf("version mismatch: daemon %s is older than client %s. Upgrade and restart daemon: 'bd daemon --stop && bd daemon'", ServerVersion, clientVersion) } // Client is same version or older - OK (daemon supports backward compat within major version) return nil } // validateDatabaseBinding validates that the client is connecting to the correct daemon // Returns error if ExpectedDB is set and doesn't match the daemon's database path func (s *Server) validateDatabaseBinding(req *Request) error { // If client doesn't specify ExpectedDB, allow but log warning (old clients) if req.ExpectedDB == "" { // Log warning for audit trail fmt.Fprintf(os.Stderr, "Warning: Client request without database binding validation (old client or missing ExpectedDB)\n") return nil } // For multi-database daemons: If a cwd is provided, verify the client expects // the database that would be selected for that cwd var daemonDB string if req.Cwd != "" { // Use the database discovery logic to find which DB would be used dbPath := s.findDatabaseForCwd(req.Cwd) if dbPath != "" { daemonDB = dbPath } else { // No database found for cwd, will fall back to default storage daemonDB = s.storage.Path() } } else { // No cwd provided, use default storage daemonDB = s.storage.Path() } // Normalize both paths for comparison (resolve symlinks, clean paths) expectedPath, err := filepath.EvalSymlinks(req.ExpectedDB) if err != nil { // If we can't resolve expected path, use it as-is expectedPath = filepath.Clean(req.ExpectedDB) } daemonPath, err := filepath.EvalSymlinks(daemonDB) if err != nil { // If we can't resolve daemon path, use it as-is daemonPath = filepath.Clean(daemonDB) } // Compare paths if expectedPath != daemonPath { return fmt.Errorf("database mismatch: client expects %s but daemon serves %s. Wrong daemon connection - check socket path", req.ExpectedDB, daemonDB) } return nil } func (s *Server) handleRequest(req *Request) Response { // Track request timing start := time.Now() // Defer metrics recording to ensure it always happens defer func() { latency := time.Since(start) s.metrics.RecordRequest(req.Operation, latency) }() // Validate database binding (skip for health/metrics to allow diagnostics) if req.Operation != OpHealth && req.Operation != OpMetrics { if err := s.validateDatabaseBinding(req); err != nil { s.metrics.RecordError(req.Operation) return Response{ Success: false, Error: err.Error(), } } } // Check version compatibility (skip for ping/health to allow version checks) if req.Operation != OpPing && req.Operation != OpHealth { if err := s.checkVersionCompatibility(req.ClientVersion); err != nil { s.metrics.RecordError(req.Operation) return Response{ Success: false, Error: err.Error(), } } } var resp Response switch req.Operation { case OpPing: resp = s.handlePing(req) case OpHealth: resp = s.handleHealth(req) case OpMetrics: resp = s.handleMetrics(req) case OpCreate: resp = s.handleCreate(req) case OpUpdate: resp = s.handleUpdate(req) case OpClose: resp = s.handleClose(req) case OpList: resp = s.handleList(req) case OpShow: resp = s.handleShow(req) case OpReady: resp = s.handleReady(req) case OpStats: resp = s.handleStats(req) case OpDepAdd: resp = s.handleDepAdd(req) case OpDepRemove: resp = s.handleDepRemove(req) case OpLabelAdd: resp = s.handleLabelAdd(req) case OpLabelRemove: resp = s.handleLabelRemove(req) case OpCommentList: resp = s.handleCommentList(req) case OpCommentAdd: resp = s.handleCommentAdd(req) case OpBatch: resp = s.handleBatch(req) case OpReposList: resp = s.handleReposList(req) case OpReposReady: resp = s.handleReposReady(req) case OpReposStats: resp = s.handleReposStats(req) case OpReposClearCache: resp = s.handleReposClearCache(req) case OpCompact: resp = s.handleCompact(req) case OpCompactStats: resp = s.handleCompactStats(req) default: s.metrics.RecordError(req.Operation) return Response{ Success: false, Error: fmt.Sprintf("unknown operation: %s", req.Operation), } } // Record error if request failed if !resp.Success { s.metrics.RecordError(req.Operation) } return resp } // Adapter helpers func (s *Server) reqCtx(_ *Request) context.Context { return context.Background() } func (s *Server) reqActor(req *Request) string { if req != nil && req.Actor != "" { return req.Actor } return "daemon" } func strValue(p *string) string { if p == nil { return "" } return *p } func strPtr(s string) *string { if s == "" { return nil } return &s } func updatesFromArgs(a UpdateArgs) map[string]interface{} { u := map[string]interface{}{} if a.Title != nil { u["title"] = *a.Title } if a.Status != nil { u["status"] = *a.Status } if a.Priority != nil { u["priority"] = *a.Priority } if a.Design != nil { u["design"] = a.Design } if a.AcceptanceCriteria != nil { u["acceptance_criteria"] = a.AcceptanceCriteria } if a.Notes != nil { u["notes"] = a.Notes } if a.Assignee != nil { u["assignee"] = a.Assignee } return u } // Handler implementations func (s *Server) handlePing(_ *Request) Response { data, _ := json.Marshal(PingResponse{ Message: "pong", Version: ServerVersion, }) return Response{ Success: true, Data: data, } } func (s *Server) handleHealth(req *Request) Response { start := time.Now() // Get memory stats for health response var m runtime.MemStats runtime.ReadMemStats(&m) store, err := s.getStorageForRequest(req) if err != nil { data, _ := json.Marshal(HealthResponse{ Status: "unhealthy", Version: ServerVersion, Uptime: time.Since(s.startTime).Seconds(), Error: fmt.Sprintf("storage error: %v", err), }) return Response{ Success: false, Data: data, Error: fmt.Sprintf("storage error: %v", err), } } healthCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() status := "healthy" dbError := "" _, pingErr := store.GetStatistics(healthCtx) dbResponseMs := time.Since(start).Seconds() * 1000 if pingErr != nil { status = "unhealthy" dbError = pingErr.Error() } else if dbResponseMs > 500 { status = "degraded" } s.cacheMu.RLock() cacheSize := len(s.storageCache) s.cacheMu.RUnlock() // Check version compatibility compatible := true if req.ClientVersion != "" { if err := s.checkVersionCompatibility(req.ClientVersion); err != nil { compatible = false } } health := HealthResponse{ Status: status, Version: ServerVersion, ClientVersion: req.ClientVersion, Compatible: compatible, Uptime: time.Since(s.startTime).Seconds(), CacheSize: cacheSize, CacheHits: atomic.LoadInt64(&s.cacheHits), CacheMisses: atomic.LoadInt64(&s.cacheMisses), DBResponseTime: dbResponseMs, ActiveConns: atomic.LoadInt32(&s.activeConns), MaxConns: s.maxConns, MemoryAllocMB: m.Alloc / 1024 / 1024, } if dbError != "" { health.Error = dbError } data, _ := json.Marshal(health) return Response{ Success: status != "unhealthy", Data: data, Error: dbError, } } func (s *Server) handleMetrics(_ *Request) Response { s.cacheMu.RLock() cacheSize := len(s.storageCache) s.cacheMu.RUnlock() snapshot := s.metrics.Snapshot( atomic.LoadInt64(&s.cacheHits), atomic.LoadInt64(&s.cacheMisses), cacheSize, int(atomic.LoadInt32(&s.activeConns)), ) data, _ := json.Marshal(snapshot) return Response{ Success: true, Data: data, } } func (s *Server) handleCreate(req *Request) Response { var createArgs CreateArgs if err := json.Unmarshal(req.Args, &createArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid create args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } var design, acceptance, assignee *string if createArgs.Design != "" { design = &createArgs.Design } if createArgs.AcceptanceCriteria != "" { acceptance = &createArgs.AcceptanceCriteria } if createArgs.Assignee != "" { assignee = &createArgs.Assignee } issue := &types.Issue{ ID: createArgs.ID, Title: createArgs.Title, Description: createArgs.Description, IssueType: types.IssueType(createArgs.IssueType), Priority: createArgs.Priority, Design: strValue(design), AcceptanceCriteria: strValue(acceptance), Assignee: strValue(assignee), Status: types.StatusOpen, } ctx := s.reqCtx(req) if err := store.CreateIssue(ctx, issue, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to create issue: %v", err), } } // Add labels if specified for _, label := range createArgs.Labels { if err := store.AddLabel(ctx, issue.ID, label, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to add label %s: %v", label, err), } } } // Add dependencies if specified for _, depSpec := range createArgs.Dependencies { depSpec = strings.TrimSpace(depSpec) if depSpec == "" { continue } var depType types.DependencyType var dependsOnID string if strings.Contains(depSpec, ":") { parts := strings.SplitN(depSpec, ":", 2) if len(parts) != 2 { return Response{ Success: false, Error: fmt.Sprintf("invalid dependency format '%s', expected 'type:id' or 'id'", depSpec), } } depType = types.DependencyType(strings.TrimSpace(parts[0])) dependsOnID = strings.TrimSpace(parts[1]) } else { depType = types.DepBlocks dependsOnID = depSpec } if !depType.IsValid() { return Response{ Success: false, Error: fmt.Sprintf("invalid dependency type '%s' (valid: blocks, related, parent-child, discovered-from)", depType), } } dep := &types.Dependency{ IssueID: issue.ID, DependsOnID: dependsOnID, Type: depType, } if err := store.AddDependency(ctx, dep, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to add dependency %s -> %s: %v", issue.ID, dependsOnID, err), } } } data, _ := json.Marshal(issue) return Response{ Success: true, Data: data, } } func (s *Server) handleUpdate(req *Request) Response { var updateArgs UpdateArgs if err := json.Unmarshal(req.Args, &updateArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid update args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) updates := updatesFromArgs(updateArgs) if len(updates) == 0 { return Response{Success: true} } if err := store.UpdateIssue(ctx, updateArgs.ID, updates, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to update issue: %v", err), } } issue, err := store.GetIssue(ctx, updateArgs.ID) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get updated issue: %v", err), } } data, _ := json.Marshal(issue) return Response{ Success: true, Data: data, } } func (s *Server) handleClose(req *Request) Response { var closeArgs CloseArgs if err := json.Unmarshal(req.Args, &closeArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid close args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) if err := store.CloseIssue(ctx, closeArgs.ID, closeArgs.Reason, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to close issue: %v", err), } } issue, _ := store.GetIssue(ctx, closeArgs.ID) data, _ := json.Marshal(issue) return Response{ Success: true, Data: data, } } func (s *Server) handleList(req *Request) Response { var listArgs ListArgs if err := json.Unmarshal(req.Args, &listArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid list args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } filter := types.IssueFilter{ Limit: listArgs.Limit, } if listArgs.Status != "" { status := types.Status(listArgs.Status) filter.Status = &status } if listArgs.IssueType != "" { issueType := types.IssueType(listArgs.IssueType) filter.IssueType = &issueType } if listArgs.Assignee != "" { filter.Assignee = &listArgs.Assignee } if listArgs.Priority != nil { filter.Priority = listArgs.Priority } // Normalize and apply label filters labels := normalizeLabels(listArgs.Labels) labelsAny := normalizeLabels(listArgs.LabelsAny) // Support both old single Label and new Labels array if len(labels) > 0 { filter.Labels = labels } else if listArgs.Label != "" { filter.Labels = []string{strings.TrimSpace(listArgs.Label)} } if len(labelsAny) > 0 { filter.LabelsAny = labelsAny } if len(listArgs.IDs) > 0 { ids := normalizeLabels(listArgs.IDs) if len(ids) > 0 { filter.IDs = ids } } // Guard against excessive ID lists to avoid SQLite parameter limits const maxIDs = 1000 if len(filter.IDs) > maxIDs { return Response{ Success: false, Error: fmt.Sprintf("--id flag supports at most %d issue IDs, got %d", maxIDs, len(filter.IDs)), } } ctx := s.reqCtx(req) issues, err := store.SearchIssues(ctx, listArgs.Query, filter) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to list issues: %v", err), } } // Populate labels for each issue for _, issue := range issues { labels, _ := store.GetLabels(ctx, issue.ID) issue.Labels = labels } data, _ := json.Marshal(issues) return Response{ Success: true, Data: data, } } func (s *Server) handleShow(req *Request) Response { var showArgs ShowArgs if err := json.Unmarshal(req.Args, &showArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid show args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) issue, err := store.GetIssue(ctx, showArgs.ID) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get issue: %v", err), } } // Populate labels, dependencies, and dependents labels, _ := store.GetLabels(ctx, issue.ID) deps, _ := store.GetDependencies(ctx, issue.ID) dependents, _ := store.GetDependents(ctx, issue.ID) // Create detailed response with related data type IssueDetails struct { *types.Issue Labels []string `json:"labels,omitempty"` Dependencies []*types.Issue `json:"dependencies,omitempty"` Dependents []*types.Issue `json:"dependents,omitempty"` } details := &IssueDetails{ Issue: issue, Labels: labels, Dependencies: deps, Dependents: dependents, } data, _ := json.Marshal(details) return Response{ Success: true, Data: data, } } func (s *Server) handleReady(req *Request) Response { var readyArgs ReadyArgs if err := json.Unmarshal(req.Args, &readyArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid ready args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } wf := types.WorkFilter{ Status: types.StatusOpen, Priority: readyArgs.Priority, Limit: readyArgs.Limit, } if readyArgs.Assignee != "" { wf.Assignee = &readyArgs.Assignee } ctx := s.reqCtx(req) issues, err := store.GetReadyWork(ctx, wf) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get ready work: %v", err), } } data, _ := json.Marshal(issues) return Response{ Success: true, Data: data, } } func (s *Server) handleStats(req *Request) Response { store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) stats, err := store.GetStatistics(ctx) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get statistics: %v", err), } } data, _ := json.Marshal(stats) return Response{ Success: true, Data: data, } } func (s *Server) handleDepAdd(req *Request) Response { var depArgs DepAddArgs if err := json.Unmarshal(req.Args, &depArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid dep add args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } dep := &types.Dependency{ IssueID: depArgs.FromID, DependsOnID: depArgs.ToID, Type: types.DependencyType(depArgs.DepType), } ctx := s.reqCtx(req) if err := store.AddDependency(ctx, dep, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to add dependency: %v", err), } } return Response{Success: true} } func (s *Server) handleDepRemove(req *Request) Response { var depArgs DepRemoveArgs if err := json.Unmarshal(req.Args, &depArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid dep remove args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) if err := store.RemoveDependency(ctx, depArgs.FromID, depArgs.ToID, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to remove dependency: %v", err), } } return Response{Success: true} } func (s *Server) handleLabelAdd(req *Request) Response { var labelArgs LabelAddArgs if err := json.Unmarshal(req.Args, &labelArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid label add args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) if err := store.AddLabel(ctx, labelArgs.ID, labelArgs.Label, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to add label: %v", err), } } return Response{Success: true} } func (s *Server) handleLabelRemove(req *Request) Response { var labelArgs LabelRemoveArgs if err := json.Unmarshal(req.Args, &labelArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid label remove args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) if err := store.RemoveLabel(ctx, labelArgs.ID, labelArgs.Label, s.reqActor(req)); err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to remove label: %v", err), } } return Response{Success: true} } func (s *Server) handleCommentList(req *Request) Response { var commentArgs CommentListArgs if err := json.Unmarshal(req.Args, &commentArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid comment list args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) comments, err := store.GetIssueComments(ctx, commentArgs.ID) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to list comments: %v", err), } } data, _ := json.Marshal(comments) return Response{ Success: true, Data: data, } } func (s *Server) handleCommentAdd(req *Request) Response { var commentArgs CommentAddArgs if err := json.Unmarshal(req.Args, &commentArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid comment add args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("storage error: %v", err), } } ctx := s.reqCtx(req) comment, err := store.AddIssueComment(ctx, commentArgs.ID, commentArgs.Author, commentArgs.Text) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to add comment: %v", err), } } data, _ := json.Marshal(comment) return Response{ Success: true, Data: data, } } func (s *Server) handleBatch(req *Request) Response { var batchArgs BatchArgs if err := json.Unmarshal(req.Args, &batchArgs); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid batch args: %v", err), } } results := make([]BatchResult, 0, len(batchArgs.Operations)) for _, op := range batchArgs.Operations { subReq := &Request{ Operation: op.Operation, Args: op.Args, Actor: req.Actor, RequestID: req.RequestID, Cwd: req.Cwd, // Pass through context ClientVersion: req.ClientVersion, // Pass through version for compatibility checks } resp := s.handleRequest(subReq) results = append(results, BatchResult{ Success: resp.Success, Data: resp.Data, Error: resp.Error, }) if !resp.Success { break } } batchResp := BatchResponse{Results: results} data, _ := json.Marshal(batchResp) return Response{ Success: true, Data: data, } } // getStorageForRequest returns the appropriate storage for the request // If req.Cwd is set, it finds the database for that directory // Otherwise, it uses the default storage func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) { // If no cwd specified, use default storage if req.Cwd == "" { return s.storage, nil } // Find database for this cwd (to get the canonical repo root) dbPath := s.findDatabaseForCwd(req.Cwd) if dbPath == "" { return nil, fmt.Errorf("no .beads database found for path: %s", req.Cwd) } // Canonicalize key to repo root (parent of .beads directory) beadsDir := filepath.Dir(dbPath) repoRoot := filepath.Dir(beadsDir) // Check cache first with write lock (to avoid race on lastAccess update) s.cacheMu.Lock() defer s.cacheMu.Unlock() if entry, ok := s.storageCache[repoRoot]; ok { // Check if DB file has been modified externally info, err := os.Stat(dbPath) if err == nil && !info.ModTime().Equal(entry.dbMtime) { // DB file changed - evict stale cache entry // Remove from cache first to prevent concurrent access delete(s.storageCache, repoRoot) atomic.AddInt64(&s.cacheMisses, 1) // Close storage after removing from cache (safe now) // Unlock briefly to avoid blocking during Close() s.cacheMu.Unlock() if err := entry.store.Close(); err != nil { // Log but don't fail - we'll reopen anyway fmt.Fprintf(os.Stderr, "Warning: failed to close stale cached storage: %v\n", err) } s.cacheMu.Lock() // Fall through to reopen } else if err == nil { // Cache hit - DB file unchanged entry.lastAccess = time.Now() atomic.AddInt64(&s.cacheHits, 1) return entry.store, nil } else { // Stat failed - evict and reopen // Remove from cache first to prevent concurrent access delete(s.storageCache, repoRoot) atomic.AddInt64(&s.cacheMisses, 1) // Close storage after removing from cache s.cacheMu.Unlock() if err := entry.store.Close(); err != nil { fmt.Fprintf(os.Stderr, "Warning: failed to close cached storage: %v\n", err) } s.cacheMu.Lock() // Fall through to reopen } } else { atomic.AddInt64(&s.cacheMisses, 1) } // Open storage store, err := sqlite.New(dbPath) if err != nil { return nil, fmt.Errorf("failed to open database at %s: %w", dbPath, err) } // Get mtime for the newly opened DB info, err := os.Stat(dbPath) if err != nil { // If we can't stat, still cache it but with zero mtime (will invalidate on next check) info = nil } mtime := time.Time{} if info != nil { mtime = info.ModTime() } // Cache it with current timestamp and mtime s.storageCache[repoRoot] = &StorageCacheEntry{ store: store, lastAccess: time.Now(), dbMtime: mtime, } // Enforce LRU immediately to prevent FD spikes between cleanup ticks needEvict := len(s.storageCache) > s.maxCacheSize s.cacheMu.Unlock() if needEvict { s.evictStaleStorage() } // Re-acquire lock for defer s.cacheMu.Lock() return store, nil } // findDatabaseForCwd walks up from cwd to find .beads/*.db func (s *Server) findDatabaseForCwd(cwd string) string { dir, err := filepath.Abs(cwd) if err != nil { return "" } // Walk up directory tree for { beadsDir := filepath.Join(dir, ".beads") if info, err := os.Stat(beadsDir); err == nil && info.IsDir() { // Found .beads/ directory, look for *.db files matches, err := filepath.Glob(filepath.Join(beadsDir, "*.db")) if err == nil && len(matches) > 0 { return matches[0] } } // Move up one directory parent := filepath.Dir(dir) if parent == dir { // Reached filesystem root break } dir = parent } return "" } func (s *Server) writeResponse(writer *bufio.Writer, resp Response) { data, _ := json.Marshal(resp) writer.Write(data) writer.WriteByte('\n') writer.Flush() } // Multi-repo handlers func (s *Server) handleReposList(_ *Request) Response { // Keep read lock during iteration to prevent stores from being closed mid-query s.cacheMu.RLock() defer s.cacheMu.RUnlock() repos := make([]RepoInfo, 0, len(s.storageCache)) for path, entry := range s.storageCache { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) stats, err := entry.store.GetStatistics(ctx) cancel() if err != nil { continue } // Extract prefix from a sample issue filter := types.IssueFilter{Limit: 1} ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second) issues, err := entry.store.SearchIssues(ctx2, "", filter) cancel2() prefix := "" if err == nil && len(issues) > 0 && len(issues[0].ID) > 0 { // Extract prefix (everything before the last hyphen and number) id := issues[0].ID for i := len(id) - 1; i >= 0; i-- { if id[i] == '-' { prefix = id[:i+1] break } } } repos = append(repos, RepoInfo{ Path: path, Prefix: prefix, IssueCount: stats.TotalIssues, LastAccess: entry.lastAccess.Format(time.RFC3339), }) } data, _ := json.Marshal(repos) return Response{ Success: true, Data: data, } } func (s *Server) handleReposReady(req *Request) Response { var args ReposReadyArgs if err := json.Unmarshal(req.Args, &args); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid args: %v", err), } } // Keep read lock during iteration to prevent stores from being closed mid-query s.cacheMu.RLock() defer s.cacheMu.RUnlock() if args.GroupByRepo { result := make([]RepoReadyWork, 0, len(s.storageCache)) for path, entry := range s.storageCache { filter := types.WorkFilter{ Status: types.StatusOpen, Limit: args.Limit, } if args.Priority != nil { filter.Priority = args.Priority } if args.Assignee != "" { filter.Assignee = &args.Assignee } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) issues, err := entry.store.GetReadyWork(ctx, filter) cancel() if err != nil || len(issues) == 0 { continue } result = append(result, RepoReadyWork{ RepoPath: path, Issues: issues, }) } data, _ := json.Marshal(result) return Response{ Success: true, Data: data, } } // Flat list of all ready issues across all repos allIssues := make([]ReposReadyIssue, 0) for path, entry := range s.storageCache { filter := types.WorkFilter{ Status: types.StatusOpen, Limit: args.Limit, } if args.Priority != nil { filter.Priority = args.Priority } if args.Assignee != "" { filter.Assignee = &args.Assignee } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) issues, err := entry.store.GetReadyWork(ctx, filter) cancel() if err != nil { continue } for _, issue := range issues { allIssues = append(allIssues, ReposReadyIssue{ RepoPath: path, Issue: issue, }) } } data, _ := json.Marshal(allIssues) return Response{ Success: true, Data: data, } } func (s *Server) handleReposStats(_ *Request) Response { // Keep read lock during iteration to prevent stores from being closed mid-query s.cacheMu.RLock() defer s.cacheMu.RUnlock() total := types.Statistics{} perRepo := make(map[string]types.Statistics) errors := make(map[string]string) for path, entry := range s.storageCache { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) stats, err := entry.store.GetStatistics(ctx) cancel() if err != nil { errors[path] = err.Error() continue } perRepo[path] = *stats // Aggregate totals total.TotalIssues += stats.TotalIssues total.OpenIssues += stats.OpenIssues total.InProgressIssues += stats.InProgressIssues total.ClosedIssues += stats.ClosedIssues total.BlockedIssues += stats.BlockedIssues total.ReadyIssues += stats.ReadyIssues total.EpicsEligibleForClosure += stats.EpicsEligibleForClosure } result := ReposStatsResponse{ Total: total, PerRepo: perRepo, } if len(errors) > 0 { result.Errors = errors } data, _ := json.Marshal(result) return Response{ Success: true, Data: data, } } func (s *Server) handleReposClearCache(_ *Request) Response { // Copy stores under write lock, clear cache, then close outside lock // to avoid holding lock during potentially slow Close() operations s.cacheMu.Lock() stores := make([]storage.Storage, 0, len(s.storageCache)) for _, entry := range s.storageCache { stores = append(stores, entry.store) } s.storageCache = make(map[string]*StorageCacheEntry) s.cacheMu.Unlock() // Close all storage connections without holding lock for _, store := range stores { if err := store.Close(); err != nil { fmt.Fprintf(os.Stderr, "Warning: failed to close storage: %v\n", err) } } return Response{ Success: true, Data: json.RawMessage(`{"message":"Cache cleared successfully"}`), } } func (s *Server) handleCompact(req *Request) Response { var args CompactArgs if err := json.Unmarshal(req.Args, &args); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid compact args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get storage: %v", err), } } sqliteStore, ok := store.(*sqlite.SQLiteStorage) if !ok { return Response{ Success: false, Error: "compact requires SQLite storage", } } config := &compact.CompactConfig{ APIKey: args.APIKey, Concurrency: args.Workers, DryRun: args.DryRun, } if config.Concurrency <= 0 { config.Concurrency = 5 } compactor, err := compact.New(sqliteStore, args.APIKey, config) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to create compactor: %v", err), } } ctx := s.reqCtx(req) startTime := time.Now() if args.IssueID != "" { if !args.Force { eligible, reason, err := sqliteStore.CheckEligibility(ctx, args.IssueID, args.Tier) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to check eligibility: %v", err), } } if !eligible { return Response{ Success: false, Error: fmt.Sprintf("%s is not eligible for Tier %d compaction: %s", args.IssueID, args.Tier, reason), } } } issue, err := sqliteStore.GetIssue(ctx, args.IssueID) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get issue: %v", err), } } originalSize := len(issue.Description) + len(issue.Design) + len(issue.Notes) + len(issue.AcceptanceCriteria) if args.DryRun { result := CompactResponse{ Success: true, IssueID: args.IssueID, OriginalSize: originalSize, Reduction: "70-80%", DryRun: true, } data, _ := json.Marshal(result) return Response{ Success: true, Data: data, } } if args.Tier == 1 { err = compactor.CompactTier1(ctx, args.IssueID) } else { return Response{ Success: false, Error: "Tier 2 compaction not yet implemented", } } if err != nil { return Response{ Success: false, Error: fmt.Sprintf("compaction failed: %v", err), } } issueAfter, _ := sqliteStore.GetIssue(ctx, args.IssueID) compactedSize := 0 if issueAfter != nil { compactedSize = len(issueAfter.Description) } duration := time.Since(startTime) result := CompactResponse{ Success: true, IssueID: args.IssueID, OriginalSize: originalSize, CompactedSize: compactedSize, Reduction: fmt.Sprintf("%.1f%%", float64(originalSize-compactedSize)/float64(originalSize)*100), Duration: duration.String(), } data, _ := json.Marshal(result) return Response{ Success: true, Data: data, } } if args.All { var candidates []*sqlite.CompactionCandidate if args.Tier == 1 { tier1, err := sqliteStore.GetTier1Candidates(ctx) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get Tier 1 candidates: %v", err), } } candidates = tier1 } else if args.Tier == 2 { tier2, err := sqliteStore.GetTier2Candidates(ctx) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get Tier 2 candidates: %v", err), } } candidates = tier2 } else { return Response{ Success: false, Error: fmt.Sprintf("invalid tier: %d (must be 1 or 2)", args.Tier), } } if len(candidates) == 0 { result := CompactResponse{ Success: true, Results: []CompactResult{}, } data, _ := json.Marshal(result) return Response{ Success: true, Data: data, } } issueIDs := make([]string, len(candidates)) for i, c := range candidates { issueIDs[i] = c.IssueID } batchResults, err := compactor.CompactTier1Batch(ctx, issueIDs) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("batch compaction failed: %v", err), } } results := make([]CompactResult, 0, len(batchResults)) for _, r := range batchResults { result := CompactResult{ IssueID: r.IssueID, Success: r.Err == nil, OriginalSize: r.OriginalSize, CompactedSize: r.CompactedSize, } if r.Err != nil { result.Error = r.Err.Error() } else if r.OriginalSize > 0 && r.CompactedSize > 0 { result.Reduction = fmt.Sprintf("%.1f%%", float64(r.OriginalSize-r.CompactedSize)/float64(r.OriginalSize)*100) } results = append(results, result) } duration := time.Since(startTime) response := CompactResponse{ Success: true, Results: results, Duration: duration.String(), DryRun: args.DryRun, } data, _ := json.Marshal(response) return Response{ Success: true, Data: data, } } return Response{ Success: false, Error: "must specify --all or --id", } } func (s *Server) handleCompactStats(req *Request) Response { var args CompactStatsArgs if err := json.Unmarshal(req.Args, &args); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid compact stats args: %v", err), } } store, err := s.getStorageForRequest(req) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get storage: %v", err), } } sqliteStore, ok := store.(*sqlite.SQLiteStorage) if !ok { return Response{ Success: false, Error: "compact stats requires SQLite storage", } } ctx := s.reqCtx(req) tier1, err := sqliteStore.GetTier1Candidates(ctx) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get Tier 1 candidates: %v", err), } } tier2, err := sqliteStore.GetTier2Candidates(ctx) if err != nil { return Response{ Success: false, Error: fmt.Sprintf("failed to get Tier 2 candidates: %v", err), } } stats := CompactStatsData{ Tier1Candidates: len(tier1), Tier2Candidates: len(tier2), Tier1MinAge: "30 days", Tier2MinAge: "90 days", TotalClosed: 0, // Could query for this but not critical } result := CompactResponse{ Success: true, Stats: &stats, } data, _ := json.Marshal(result) return Response{ Success: true, Data: data, } }