From 781e300d3341885d3cc42a11dd126014e952dbb4 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Tue, 28 Oct 2025 13:12:37 -0700 Subject: [PATCH] Add event-driven daemon architecture (Phase 1 foundation) - Add fsnotify dependency for file watching - Create daemon_debouncer.go: batch rapid events (500ms window) - Create daemon_watcher.go: monitor JSONL and git refs changes - Create daemon_event_loop.go: event-driven sync loop - Add mutation channel to RPC server (create/update/close events) - Add BEADS_DAEMON_MODE env var (poll/events, default: poll) Phase 1 implementation: opt-in via BEADS_DAEMON_MODE=events Target: <500ms latency (vs 5000ms), ~60% CPU reduction Related: bd-49 (epic), bd-50, bd-51, bd-53, bd-54, bd-55, bd-56 Amp-Thread-ID: https://ampcode.com/threads/T-35a3d0d7-4e19-421d-8392-63755035036e Co-authored-by: Amp --- cmd/bd/daemon.go | 27 +- cmd/bd/daemon_debouncer.go | 55 ++ cmd/bd/daemon_event_loop.go | 124 +++ cmd/bd/daemon_watcher.go | 97 ++ event_driven_daemon.md | 788 +++++++++++++++ internal/rpc/server_core.go | 30 + internal/rpc/server_issues_epics.go | 9 + repair_commands.md | 1398 +++++++++++++++++++++++++++ 8 files changed, 2527 insertions(+), 1 deletion(-) create mode 100644 cmd/bd/daemon_debouncer.go create mode 100644 cmd/bd/daemon_event_loop.go create mode 100644 cmd/bd/daemon_watcher.go create mode 100644 event_driven_daemon.md create mode 100644 repair_commands.md diff --git a/cmd/bd/daemon.go b/cmd/bd/daemon.go index 2517cf1a..61d8d92f 100644 --- a/cmd/bd/daemon.go +++ b/cmd/bd/daemon.go @@ -1299,5 +1299,30 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p doSync := createSyncFunc(ctx, store, autoCommit, autoPush, log) doSync() - runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log) + // Choose event loop based on BEADS_DAEMON_MODE + daemonMode := os.Getenv("BEADS_DAEMON_MODE") + if daemonMode == "" { + daemonMode = "poll" // Default to polling for Phase 1 + } + + switch daemonMode { + case "events": + log.log("Using event-driven mode") + // For Phase 1: event-driven mode uses full sync on both export and import events + // TODO: Optimize to separate export-only and import-only triggers + jsonlPath := findJSONLPath() + if jsonlPath == "" { + log.log("Error: JSONL path not found, cannot use event-driven mode") + log.log("Falling back to polling mode") + runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log) + } else { + runEventDrivenLoop(ctx, cancel, server, serverErrChan, store, jsonlPath, doSync, doSync, log) + } + case "poll": + log.log("Using polling mode (interval: %v)", interval) + runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log) + default: + log.log("Unknown BEADS_DAEMON_MODE: %s (valid: poll, events), defaulting to poll", daemonMode) + runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log) + } } diff --git a/cmd/bd/daemon_debouncer.go b/cmd/bd/daemon_debouncer.go new file mode 100644 index 00000000..3641325f --- /dev/null +++ b/cmd/bd/daemon_debouncer.go @@ -0,0 +1,55 @@ +package main + +import ( + "sync" + "time" +) + +// Debouncer batches rapid events into a single action after a quiet period. +// Thread-safe for concurrent triggers. +type Debouncer struct { + mu sync.Mutex + timer *time.Timer + duration time.Duration + action func() +} + +// NewDebouncer creates a new debouncer with the given duration and action. +// The action will be called once after the duration has passed since the last trigger. +func NewDebouncer(duration time.Duration, action func()) *Debouncer { + return &Debouncer{ + duration: duration, + action: action, + } +} + +// Trigger schedules the action to run after the debounce duration. +// If called multiple times, the timer is reset each time, ensuring +// the action only fires once after the last trigger. +func (d *Debouncer) Trigger() { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil { + d.timer.Stop() + } + + d.timer = time.AfterFunc(d.duration, func() { + d.action() + d.mu.Lock() + d.timer = nil + d.mu.Unlock() + }) +} + +// Cancel stops any pending debounced action. +// Safe to call even if no action is pending. +func (d *Debouncer) Cancel() { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil { + d.timer.Stop() + d.timer = nil + } +} diff --git a/cmd/bd/daemon_event_loop.go b/cmd/bd/daemon_event_loop.go new file mode 100644 index 00000000..91498a0c --- /dev/null +++ b/cmd/bd/daemon_event_loop.go @@ -0,0 +1,124 @@ +package main + +import ( + "context" + "os" + "os/signal" + "time" + + "github.com/steveyegge/beads/internal/rpc" + "github.com/steveyegge/beads/internal/storage" +) + +// runEventDrivenLoop implements event-driven daemon architecture. +// Replaces polling ticker with reactive event handlers for: +// - File system changes (JSONL modifications) +// - RPC mutations (create, update, delete) +// - Git operations (via hooks, optional) +func runEventDrivenLoop( + ctx context.Context, + cancel context.CancelFunc, + server *rpc.Server, + serverErrChan chan error, + store storage.Storage, + jsonlPath string, + doExport func(), + doAutoImport func(), + log daemonLogger, +) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, daemonSignals...) + defer signal.Stop(sigChan) + + // Debounced sync actions + exportDebouncer := NewDebouncer(500*time.Millisecond, func() { + log.log("Export triggered by mutation events") + doExport() + }) + defer exportDebouncer.Cancel() + + importDebouncer := NewDebouncer(500*time.Millisecond, func() { + log.log("Import triggered by file change") + doAutoImport() + }) + defer importDebouncer.Cancel() + + // Start file watcher for JSONL changes + watcher, err := NewFileWatcher(jsonlPath, func() { + importDebouncer.Trigger() + }) + if err != nil { + log.log("WARNING: File watcher unavailable (%v), mutations will trigger export only", err) + watcher = nil + } else { + watcher.Start(ctx, log) + defer watcher.Close() + } + + // Handle mutation events from RPC server + mutationChan := server.MutationChan() + go func() { + for { + select { + case event := <-mutationChan: + log.log("Mutation detected: %s %s", event.Type, event.IssueID) + exportDebouncer.Trigger() + + case <-ctx.Done(): + return + } + } + }() + + // Optional: Periodic health check (not a sync poll) + healthTicker := time.NewTicker(60 * time.Second) + defer healthTicker.Stop() + + for { + select { + case <-healthTicker.C: + // Periodic health validation (not sync) + checkDaemonHealth(ctx, store, log) + + case sig := <-sigChan: + if isReloadSignal(sig) { + log.log("Received reload signal, ignoring") + continue + } + log.log("Received signal %v, shutting down...", sig) + cancel() + if err := server.Stop(); err != nil { + log.log("Error stopping server: %v", err) + } + return + + case <-ctx.Done(): + log.log("Context canceled, shutting down") + if watcher != nil { + watcher.Close() + } + if err := server.Stop(); err != nil { + log.log("Error stopping server: %v", err) + } + return + + case err := <-serverErrChan: + log.log("RPC server failed: %v", err) + cancel() + if watcher != nil { + watcher.Close() + } + return + } + } +} + +// checkDaemonHealth performs periodic health validation. +// Separate from sync operations - just validates state. +func checkDaemonHealth(ctx context.Context, store storage.Storage, log daemonLogger) { + // TODO: Add health checks: + // - Database integrity check + // - Disk space check + // - Memory usage check + // For now, this is a no-op placeholder +} diff --git a/cmd/bd/daemon_watcher.go b/cmd/bd/daemon_watcher.go new file mode 100644 index 00000000..1104f0eb --- /dev/null +++ b/cmd/bd/daemon_watcher.go @@ -0,0 +1,97 @@ +package main + +import ( + "context" + "fmt" + "path/filepath" + "time" + + "github.com/fsnotify/fsnotify" +) + +// FileWatcher monitors JSONL and git ref changes using filesystem events. +type FileWatcher struct { + watcher *fsnotify.Watcher + debouncer *Debouncer + jsonlPath string +} + +// NewFileWatcher creates a file watcher for the given JSONL path. +// onChanged is called when the file or git refs change, after debouncing. +func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + fw := &FileWatcher{ + watcher: watcher, + jsonlPath: jsonlPath, + debouncer: NewDebouncer(500*time.Millisecond, onChanged), + } + + // Watch the JSONL file + if err := watcher.Add(jsonlPath); err != nil { + watcher.Close() + return nil, fmt.Errorf("failed to watch JSONL: %w", err) + } + + // Also watch .git/refs/heads for branch changes (best effort) + gitRefsPath := filepath.Join(filepath.Dir(jsonlPath), "..", ".git", "refs", "heads") + _ = watcher.Add(gitRefsPath) // Ignore error - not all setups have this + + return fw, nil +} + +// Start begins monitoring filesystem events. +// Runs in background goroutine until context is canceled. +func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) { + go func() { + for { + select { + case event, ok := <-fw.watcher.Events: + if !ok { + return + } + + // Handle JSONL write events + if event.Name == fw.jsonlPath && event.Op&fsnotify.Write != 0 { + log.log("File change detected: %s", event.Name) + fw.debouncer.Trigger() + } + + // Handle JSONL removal/rename (e.g., git checkout) + if event.Name == fw.jsonlPath && (event.Op&fsnotify.Remove != 0 || event.Op&fsnotify.Rename != 0) { + log.log("JSONL removed/renamed, re-establishing watch") + fw.watcher.Remove(fw.jsonlPath) + // Brief wait for file to be recreated + time.Sleep(100 * time.Millisecond) + if err := fw.watcher.Add(fw.jsonlPath); err != nil { + log.log("Failed to re-watch JSONL: %v", err) + } + } + + // Handle git ref changes + if event.Op&fsnotify.Write != 0 && filepath.Dir(event.Name) != filepath.Dir(fw.jsonlPath) { + log.log("Git ref change detected: %s", event.Name) + fw.debouncer.Trigger() + } + + case err, ok := <-fw.watcher.Errors: + if !ok { + return + } + log.log("Watcher error: %v", err) + + case <-ctx.Done(): + return + } + } + }() +} + +// Close stops the file watcher and releases resources. +func (fw *FileWatcher) Close() error { + fw.debouncer.Cancel() + return fw.watcher.Close() +} diff --git a/event_driven_daemon.md b/event_driven_daemon.md new file mode 100644 index 00000000..90b16fd8 --- /dev/null +++ b/event_driven_daemon.md @@ -0,0 +1,788 @@ +# Event-Driven Daemon Architecture + +**Status:** Design Proposal +**Author:** AI Assistant +**Date:** 2025-10-28 +**Context:** Post-cache removal, per-project daemon model established + +## Executive Summary + +Replace the current 5-second polling sync loop with an event-driven architecture that reacts instantly to changes. This eliminates stale data issues while reducing CPU usage and improving user experience. + +**Key metrics:** +- Latency improvement: 5000ms → <500ms +- CPU reduction: ~60% (no polling) +- Code complexity: +300 LOC (event handling), but cleaner semantics +- User impact: Instant feedback, no stale cache pain + +## Problem Statement + +### Current Architecture Issues + +**Polling-based sync** (`cmd/bd/daemon.go:1010-1120`): +```go +ticker := time.NewTicker(5 * time.Second) +for { + case <-ticker.C: + doSync() // Export, pull, import, push +} +``` + +**Pain points:** +1. **Stale data window**: Changes invisible for up to 5 seconds +2. **CPU waste**: Daemon wakes every 5s even if nothing changed +3. **Unnecessary work**: Sync cycle runs even when no mutations occurred +4. **Cache confusion**: (Now removed) Cache staleness compounded delay + +### What Cache Removal Enables + +The recent cache removal (Oct 27-28, 964 LOC removed) creates ideal conditions for event-driven architecture: + +✅ **One daemon = One database**: No cache eviction, no cross-workspace confusion +✅ **Simpler state**: Daemon state is just `s.storage`, no cache maps +✅ **Clear ownership**: Each daemon owns exactly one JSONL + SQLite pair +✅ **No invalidation complexity**: Events can directly trigger actions + +## Proposed Architecture + +### High-Level Flow + +``` +┌─────────────────────────────────────────────────────────┐ +│ Event-Driven Daemon │ +├─────────────────────────────────────────────────────────┤ +│ │ +│ Event Sources Event Handler │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ FS Watcher │─────────→│ │ │ +│ │ (JSONL file) │ │ Debouncer │ │ +│ └──────────────┘ │ (500ms) │ │ +│ │ │ │ +│ ┌──────────────┐ └──────────────┘ │ +│ │ RPC Mutation │─────────→ │ │ +│ │ Events │ │ │ +│ └──────────────┘ ↓ │ +│ ┌──────────────┐ │ +│ ┌──────────────┐ │ Sync Action │ │ +│ │ Git Hooks │─────────→│ - Export │ │ +│ │ (optional) │ │ - Import │ │ +│ └──────────────┘ └──────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────┘ +``` + +### Components + +#### 1. File System Watcher + +**Purpose:** Detect JSONL changes from external sources (git pull, manual edits) + +**Implementation:** +```go +// cmd/bd/daemon_watcher.go (new file) +package main + +import ( + "context" + "path/filepath" + "time" + + "github.com/fsnotify/fsnotify" +) + +type FileWatcher struct { + watcher *fsnotify.Watcher + debouncer *Debouncer + jsonlPath string +} + +func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + fw := &FileWatcher{ + watcher: watcher, + jsonlPath: jsonlPath, + debouncer: NewDebouncer(500*time.Millisecond, onChanged), + } + + // Watch JSONL file + if err := watcher.Add(jsonlPath); err != nil { + watcher.Close() + return nil, err + } + + // Also watch .git/refs/heads for branch changes + gitRefsPath := filepath.Join(filepath.Dir(jsonlPath), "..", ".git", "refs", "heads") + _ = watcher.Add(gitRefsPath) // Best effort + + return fw, nil +} + +func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) { + go func() { + for { + select { + case event, ok := <-fw.watcher.Events: + if !ok { + return + } + + // Only care about writes to JSONL or ref changes + if event.Name == fw.jsonlPath && event.Op&fsnotify.Write != 0 { + log.log("File change detected: %s", event.Name) + fw.debouncer.Trigger() + } else if event.Op&fsnotify.Write != 0 { + log.log("Git ref change detected: %s", event.Name) + fw.debouncer.Trigger() + } + + case err, ok := <-fw.watcher.Errors: + if !ok { + return + } + log.log("Watcher error: %v", err) + + case <-ctx.Done(): + return + } + } + }() +} + +func (fw *FileWatcher) Close() error { + return fw.watcher.Close() +} +``` + +**Platform support:** +- **Linux**: inotify (built into fsnotify) +- **macOS**: FSEvents (built into fsnotify) +- **Windows**: ReadDirectoryChangesW (built into fsnotify) + +**Edge cases handled:** +- File rename (git atomic write via temp file): Watch directory, not just file +- Event storm (rapid git writes): Debouncer batches into single action +- Watcher failure: Fall back to polling with warning + +#### 2. Debouncer + +**Purpose:** Batch rapid events into single action + +**Implementation:** +```go +// cmd/bd/daemon_debouncer.go (new file) +package main + +import ( + "sync" + "time" +) + +type Debouncer struct { + mu sync.Mutex + timer *time.Timer + duration time.Duration + action func() +} + +func NewDebouncer(duration time.Duration, action func()) *Debouncer { + return &Debouncer{ + duration: duration, + action: action, + } +} + +func (d *Debouncer) Trigger() { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil { + d.timer.Stop() + } + + d.timer = time.AfterFunc(d.duration, func() { + d.action() + d.mu.Lock() + d.timer = nil + d.mu.Unlock() + }) +} + +func (d *Debouncer) Cancel() { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil { + d.timer.Stop() + d.timer = nil + } +} +``` + +**Tuning:** +- Default: 500ms (balance between responsiveness and batching) +- Configurable via `BEADS_DEBOUNCE_MS` env var +- Could use adaptive timing based on event frequency + +#### 3. RPC Mutation Events + +**Purpose:** Trigger export immediately after DB changes (not in 5s) + +**Implementation:** +```go +// internal/rpc/server.go (modifications) +type Server struct { + // ... existing fields + mutationChan chan MutationEvent +} + +type MutationEvent struct { + Type string // "create", "update", "delete" + IssueID string // e.g., "bd-42" + Timestamp time.Time +} + +func (s *Server) CreateIssue(req *CreateRequest) (*Issue, error) { + issue, err := s.storage.CreateIssue(req) + if err != nil { + return nil, err + } + + // Notify mutation channel + select { + case s.mutationChan <- MutationEvent{ + Type: "create", + IssueID: issue.ID, + Timestamp: time.Now(), + }: + default: + // Channel full, event dropped (sync will happen eventually) + } + + return issue, nil +} + +// Similar for UpdateIssue, DeleteIssue, AddComment, etc. +``` + +**Handler in daemon:** +```go +// cmd/bd/daemon.go (modification) +func handleMutationEvents(ctx context.Context, events <-chan rpc.MutationEvent, + debouncer *Debouncer, log daemonLogger) { + go func() { + for { + select { + case event := <-events: + log.log("Mutation detected: %s %s", event.Type, event.IssueID) + debouncer.Trigger() // Schedule export + + case <-ctx.Done(): + return + } + } + }() +} +``` + +#### 4. Git Hook Integration (Optional) + +**Purpose:** Explicit notifications from git operations + +**Implementation:** +```bash +# .git/hooks/post-merge (installed by bd init --quiet) +#!/bin/bash +# Notify daemon of merge completion +if command -v bd &> /dev/null; then + bd daemon-event import-needed & +fi +``` + +```go +// cmd/bd/daemon_event.go (new file) +package main + +// Called by git hooks to notify daemon +func handleDaemonEvent() { + if len(os.Args) < 3 { + fmt.Fprintln(os.Stderr, "Usage: bd daemon-event ") + os.Exit(1) + } + + eventType := os.Args[2] + socketPath := getSocketPath() + + client := rpc.NewClient(socketPath) + ctx := context.Background() + + switch eventType { + case "import-needed": + // Git hook says "JSONL changed, please import" + if err := client.TriggerImport(ctx); err != nil { + // Ignore error - daemon might not be running + os.Exit(0) + } + case "export-needed": + if err := client.TriggerExport(ctx); err != nil { + os.Exit(0) + } + default: + fmt.Fprintf(os.Stderr, "Unknown event type: %s\n", eventType) + os.Exit(1) + } +} +``` + +**Note:** Git hooks are **optional enhancement**, not required. File watcher is primary mechanism. + +### Complete Daemon Loop + +**Current implementation** (`cmd/bd/daemon.go:1123-1161`): +```go +func runEventLoop(ctx context.Context, cancel context.CancelFunc, ticker *time.Ticker, + doSync func(), server *rpc.Server, serverErrChan chan error, + log daemonLogger) { + for { + select { + case <-ticker.C: // ← Every 5 seconds + doSync() + case sig := <-sigChan: + // ... shutdown + } + } +} +``` + +**Proposed implementation:** +```go +// cmd/bd/daemon_event_loop.go (new file) +func runEventDrivenLoop(ctx context.Context, cancel context.CancelFunc, + server *rpc.Server, serverErrChan chan error, + watcher *FileWatcher, mutationChan <-chan rpc.MutationEvent, + log daemonLogger) { + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, daemonSignals...) + defer signal.Stop(sigChan) + + // Debounced sync actions + exportDebouncer := NewDebouncer(500*time.Millisecond, func() { + log.log("Export triggered by mutation events") + exportToJSONL() + }) + + importDebouncer := NewDebouncer(500*time.Millisecond, func() { + log.log("Import triggered by file change") + autoImportIfNewer() + }) + + // Start file watcher (triggers import) + watcher.Start(ctx, log) + + // Start mutation handler (triggers export) + handleMutationEvents(ctx, mutationChan, exportDebouncer, log) + + // Optional: Periodic health check (every 60s, not sync) + healthTicker := time.NewTicker(60 * time.Second) + defer healthTicker.Stop() + + for { + select { + case <-healthTicker.C: + // Periodic health check (validate DB, check disk space, etc.) + checkDaemonHealth(ctx, store, log) + + case sig := <-sigChan: + if isReloadSignal(sig) { + log.log("Received reload signal, ignoring") + continue + } + log.log("Received signal %v, shutting down...", sig) + cancel() + if err := server.Stop(); err != nil { + log.log("Error stopping server: %v", err) + } + return + + case <-ctx.Done(): + log.log("Context canceled, shutting down") + watcher.Close() + if err := server.Stop(); err != nil { + log.log("Error stopping server: %v", err) + } + return + + case err := <-serverErrChan: + log.log("RPC server failed: %v", err) + cancel() + watcher.Close() + return + } + } +} +``` + +## Migration Strategy + +### Phase 1: Parallel Implementation (2-3 weeks) + +**Goal:** Event-driven as opt-in alongside polling + +**Changes:** +1. Add `fsnotify` dependency to `go.mod` +2. Create new files: + - `cmd/bd/daemon_watcher.go` (~150 LOC) + - `cmd/bd/daemon_debouncer.go` (~60 LOC) + - `cmd/bd/daemon_event_loop.go` (~200 LOC) +3. Add flag `BEADS_DAEMON_MODE=events` to enable +4. Keep existing `runEventLoop` as fallback + +**Testing:** +- Unit tests for debouncer +- Integration tests for file watcher +- Stress test with event storm (rapid git operations) +- Test on Linux, macOS, Windows + +**Rollout:** +- Default: `BEADS_DAEMON_MODE=poll` (current behavior) +- Opt-in: `BEADS_DAEMON_MODE=events` (new behavior) +- Documentation: Add to AGENTS.md + +### Phase 2: Battle Testing (4-6 weeks) + +**Goal:** Real-world validation with dogfooding + +**Metrics to track:** +- CPU usage (before/after comparison) +- Latency (time from mutation to JSONL update) +- Memory usage (fsnotify overhead) +- Event storm handling (git pull with 100+ file changes) +- Edge case frequency (watcher failures, debounce races) + +**Success criteria:** +- CPU usage <40% of polling mode +- Latency <500ms (vs 5000ms in polling) +- Zero data loss or corruption +- Zero daemon crashes from event handling + +**Issue tracking:** +- Create `bd-XXX: Event-driven daemon stabilization` issue +- Track bugs as sub-issues +- Weekly review of metrics + +### Phase 3: Default Switchover (1 week) + +**Goal:** Make event-driven the default + +**Changes:** +1. Flip default: `BEADS_DAEMON_MODE=events` +2. Keep polling as fallback: `BEADS_DAEMON_MODE=poll` +3. Update documentation +4. Add release note + +**Communication:** +- Blog post: "Beads daemon now event-driven" +- Changelog entry with before/after metrics +- Migration guide for users who hit issues + +### Phase 4: Deprecation (6+ months later) + +**Goal:** Remove polling mode entirely + +**Changes:** +1. Remove `runEventLoop` with ticker +2. Remove `BEADS_DAEMON_MODE` flag +3. Simplify daemon startup code + +**Only if:** +- Event-driven stable for 6+ months +- No unresolved critical issues +- Community feedback positive + +## Performance Analysis + +### CPU Usage + +**Current (polling):** +``` +Every 5 seconds: +- Wake daemon +- Check git status +- Check JSONL hash +- Check dirty flags +- Sleep + +Estimated: ~5-10% CPU (depends on repo size) +``` + +**Event-driven:** +``` +Daemon sleeps until: +- File system event (rare) +- RPC mutation (user-triggered) +- Signal + +Estimated: ~1-2% CPU (mostly idle) +``` + +**Savings:** ~60-80% CPU reduction + +### Latency + +**Current (polling):** +``` +User runs: bd create "Fix bug" +→ RPC call → DB write → (wait up to 5s) → Export → Git commit +Average: 2.5s delay +Worst: 5s delay +``` + +**Event-driven:** +``` +User runs: bd create "Fix bug" +→ RPC call → DB write → Mutation event → (500ms debounce) → Export → Git commit +Average: 250ms delay +Worst: 500ms delay +``` + +**Improvement:** 5-10x faster + +### Memory Usage + +**fsnotify overhead:** +- Linux (inotify): ~1-2 MB per watched directory +- macOS (FSEvents): ~500 KB per watched directory +- Windows: ~1 MB per watched directory + +**With 1 JSONL + 1 git refs directory = ~2-4 MB** + +**Negligible compared to SQLite cache (10-50 MB for typical database)** + +## Edge Cases & Error Handling + +### 1. File Watcher Failure + +**Scenario:** `inotify` limit exceeded (Linux), permissions issue, or filesystem doesn't support watching + +**Detection:** +```go +watcher, err := fsnotify.NewWatcher() +if err != nil { + log.log("WARNING: File watcher unavailable (%v), falling back to polling", err) + useFallbackPolling = true +} +``` + +**Fallback:** Automatic switch to 5s polling with warning + +### 2. Event Storm + +**Scenario:** Git pull modifies JSONL 50 times in rapid succession + +**Mitigation:** Debouncer batches into single action after 500ms quiet period + +**Stress test:** +```bash +# Simulate event storm +for i in {1..100}; do + echo '{"id":"bd-'$i'"}' >> beads.jsonl +done +# Should trigger exactly 1 import after 500ms +``` + +### 3. Watcher Detached from File + +**Scenario:** JSONL replaced by `git checkout` (different inode) + +**Detection:** fsnotify sends `RENAME` or `REMOVE` event + +**Recovery:** +```go +case event.Op&fsnotify.Remove != 0: + log.log("JSONL removed, re-establishing watch") + watcher.Remove(jsonlPath) + time.Sleep(100 * time.Millisecond) + watcher.Add(jsonlPath) +``` + +### 4. Debounce Race Condition + +**Scenario:** Event A triggers debounce, event B arrives during wait, action fires for A before B seen + +**Solution:** Debouncer restarts timer on each trigger (standard debounce behavior) + +**Test:** +```go +func TestDebouncerBatchesMultipleEvents(t *testing.T) { + callCount := 0 + d := NewDebouncer(100*time.Millisecond, func() { callCount++ }) + + d.Trigger() // t=0ms + time.Sleep(50 * time.Millisecond) + d.Trigger() // t=50ms (resets timer) + time.Sleep(50 * time.Millisecond) + d.Trigger() // t=100ms (resets timer) + + time.Sleep(150 * time.Millisecond) // t=250ms (timer fires) + + assert.Equal(t, 1, callCount) // Only 1 action despite 3 triggers +} +``` + +### 5. Daemon Restart During Debounce + +**Scenario:** Daemon receives SIGTERM while debouncer waiting + +**Solution:** Cancel debouncer on shutdown + +```go +func (d *Debouncer) Cancel() { + d.mu.Lock() + defer d.mu.Unlock() + if d.timer != nil { + d.timer.Stop() + } +} + +// In shutdown handler +defer exportDebouncer.Cancel() +defer importDebouncer.Cancel() +``` + +## Configuration + +### Environment Variables + +```bash +# Enable event-driven mode (default: events after Phase 3) +BEADS_DAEMON_MODE=events + +# Debounce duration in milliseconds (default: 500) +BEADS_DEBOUNCE_MS=500 + +# Fall back to polling if watcher fails (default: true) +BEADS_WATCHER_FALLBACK=true + +# Polling interval if fallback used (default: 5s) +BEADS_POLL_INTERVAL=5s +``` + +### Daemon Status + +**New command:** `bd daemon status --verbose` + +```bash +$ bd daemon status --verbose +Daemon running: yes +PID: 12345 +Mode: event-driven +Uptime: 3h 42m +Last sync: 2s ago + +Event statistics: + File changes: 23 + Mutations: 156 + Exports: 12 (batched from 156 mutations) + Imports: 4 (batched from 23 file changes) + +Watcher status: active + Watching: /Users/steve/beads/.beads/beads.jsonl + Events received: 23 + Errors: 0 +``` + +## What This Doesn't Solve + +Event-driven architecture improves **responsiveness** but doesn't eliminate **repair cycles** caused by: + +1. **Git merge conflicts** - Still need manual/AI resolution +2. **Semantic duplication** - Still need deduplication logic +3. **Test pollution** - Still need better isolation (separate issue) +4. **Worktree confusion** - Still need per-worktree branch tracking (separate design) + +**These require separate solutions** (see repair commands design doc) + +## Success Metrics + +### Must-Have (P0) +- ✅ Zero data loss or corruption +- ✅ Zero regressions in sync reliability +- ✅ Works on Linux, macOS, Windows + +### Should-Have (P1) +- ✅ Latency <500ms (vs 5000ms today) +- ✅ CPU usage <40% of polling mode +- ✅ Graceful fallback to polling if watcher fails + +### Nice-to-Have (P2) +- ✅ Configurable debounce timing +- ✅ Detailed event statistics in `bd daemon status` +- ✅ Real-time dashboard of events (debug mode) + +## Implementation Checklist + +### Code Changes +- [ ] Add `fsnotify` to `go.mod` +- [ ] Create `cmd/bd/daemon_watcher.go` +- [ ] Create `cmd/bd/daemon_debouncer.go` +- [ ] Create `cmd/bd/daemon_event_loop.go` +- [ ] Modify `internal/rpc/server.go` (add mutation channel) +- [ ] Add `BEADS_DAEMON_MODE` flag handling +- [ ] Add fallback to polling on watcher failure + +### Tests +- [ ] Unit tests for Debouncer +- [ ] Unit tests for FileWatcher +- [ ] Integration test: mutation → export latency +- [ ] Integration test: file change → import latency +- [ ] Stress test: event storm (100+ rapid changes) +- [ ] Platform tests: Linux, macOS, Windows +- [ ] Edge case test: watcher failure recovery +- [ ] Edge case test: file inode change (git checkout) + +### Documentation +- [ ] Update AGENTS.md (event-driven mode) +- [ ] Add `docs/architecture/event_driven.md` (this doc) +- [ ] Update `bd daemon --help` (add --mode flag) +- [ ] Add troubleshooting guide (watcher failures) +- [ ] Write migration guide (for users hitting issues) + +### Rollout +- [ ] Phase 1: Parallel implementation (opt-in) +- [ ] Phase 2: Dogfooding (beads repo itself) +- [ ] Phase 3: Default switchover +- [ ] Phase 4: Announce in release notes + +## Open Questions + +1. **Should git hooks be required or optional?** + - Recommendation: Optional (file watcher is sufficient) + +2. **What debounce duration is optimal?** + - Recommendation: 500ms default, configurable + - Could use adaptive timing based on event frequency + +3. **Should we track event statistics permanently?** + - Recommendation: In-memory only (reset on daemon restart) + - Could add `bd daemon stats --export` for debugging + +4. **What happens if fsnotify doesn't support filesystem?** + - Recommendation: Automatic fallback to polling with warning + +5. **Should mutation events be buffered or dropped if channel full?** + - Recommendation: Buffered (size 100), then drop oldest + - Worst case: Export delayed by 500ms, but no data loss + +## Conclusion + +Event-driven architecture is a **natural evolution** after cache removal: +- ✅ Eliminates stale data issues +- ✅ Reduces CPU usage significantly +- ✅ Improves user experience with instant feedback +- ✅ Builds on simplified per-project daemon model + +**Recommended:** Proceed with Phase 1 implementation, targeting 2-3 week timeline for opt-in release. diff --git a/internal/rpc/server_core.go b/internal/rpc/server_core.go index 74a101ac..5452f5fb 100644 --- a/internal/rpc/server_core.go +++ b/internal/rpc/server_core.go @@ -46,6 +46,15 @@ type Server struct { readyChan chan struct{} // Auto-import single-flight guard importInProgress atomic.Bool + // Mutation events for event-driven daemon + mutationChan chan MutationEvent +} + +// MutationEvent represents a database mutation for event-driven sync +type MutationEvent struct { + Type string // "create", "update", "delete", "comment" + IssueID string // e.g., "bd-42" + Timestamp time.Time } // NewServer creates a new RPC server @@ -79,7 +88,28 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d connSemaphore: make(chan struct{}, maxConns), requestTimeout: requestTimeout, readyChan: make(chan struct{}), + mutationChan: make(chan MutationEvent, 100), // Buffered to avoid blocking } s.lastActivityTime.Store(time.Now()) return s } + +// emitMutation sends a mutation event to the daemon's event-driven loop. +// Non-blocking: drops event if channel is full (sync will happen eventually). +func (s *Server) emitMutation(eventType, issueID string) { + select { + case s.mutationChan <- MutationEvent{ + Type: eventType, + IssueID: issueID, + Timestamp: time.Now(), + }: + // Event sent successfully + default: + // Channel full, event dropped (not critical - sync will happen eventually) + } +} + +// MutationChan returns the mutation event channel for the daemon to consume +func (s *Server) MutationChan() <-chan MutationEvent { + return s.mutationChan +} diff --git a/internal/rpc/server_issues_epics.go b/internal/rpc/server_issues_epics.go index ce5019c5..76879c17 100644 --- a/internal/rpc/server_issues_epics.go +++ b/internal/rpc/server_issues_epics.go @@ -159,6 +159,9 @@ func (s *Server) handleCreate(req *Request) Response { } } + // Emit mutation event for event-driven daemon + s.emitMutation("create", issue.ID) + data, _ := json.Marshal(issue) return Response{ Success: true, @@ -190,6 +193,9 @@ func (s *Server) handleUpdate(req *Request) Response { } } + // Emit mutation event for event-driven daemon + s.emitMutation("update", updateArgs.ID) + issue, err := store.GetIssue(ctx, updateArgs.ID) if err != nil { return Response{ @@ -224,6 +230,9 @@ func (s *Server) handleClose(req *Request) Response { } } + // Emit mutation event for event-driven daemon + s.emitMutation("update", closeArgs.ID) + issue, _ := store.GetIssue(ctx, closeArgs.ID) data, _ := json.Marshal(issue) return Response{ diff --git a/repair_commands.md b/repair_commands.md new file mode 100644 index 00000000..1ed9df9c --- /dev/null +++ b/repair_commands.md @@ -0,0 +1,1398 @@ +# Repair Commands & AI-Assisted Tooling + +**Status:** Design Proposal +**Author:** AI Assistant +**Date:** 2025-10-28 +**Context:** Reduce agent repair burden by providing specialized repair tools + +## Executive Summary + +Agents spend significant time repairing beads databases due to: +1. Git merge conflicts in JSONL +2. Duplicate issues from parallel work +3. Semantic inconsistencies (labeling, dependencies) +4. Orphaned references after deletions + +**Solution:** Add dedicated repair commands that agents (and humans) can invoke instead of manually fixing these issues. Some commands use AI for semantic understanding, others are pure mechanical checks. + +## Problem Analysis + +### Current Repair Scenarios + +Based on codebase analysis and commit history: + +#### 1. Git Merge Conflicts (High Frequency) + +**Scenario:** +```bash +# Feature branch creates bd-42 +git checkout -b feature +bd create "Add authentication" # Creates bd-42 + +# Meanwhile, main branch also creates bd-42 +git checkout main +bd create "Fix logging" # Also creates bd-42 + +# Merge creates conflict +git checkout feature +git merge main +``` + +**JSONL conflict:** +```json +<<<<<<< HEAD +{"id":"bd-42","title":"Add authentication",...} +======= +{"id":"bd-42","title":"Fix logging",...} +>>>>>>> main +``` + +**Current fix:** Agent manually parses conflict markers, remaps IDs, updates references + +**Pain points:** +- Time-consuming (5-10 minutes per conflict) +- Error-prone (easy to miss references) +- Repetitive (same logic every time) + +#### 2. Semantic Duplicates (Medium Frequency) + +**Scenario:** +```bash +# Agent A creates issue +bd create "Fix memory leak in parser" # bd-42 + +# Agent B creates similar issue (different session) +bd create "Parser memory leak needs fixing" # bd-87 + +# Human notices: "These are the same issue!" +``` + +**Current fix:** Agent manually: +1. Reads both issues +2. Determines they're duplicates +3. Picks canonical one +4. Closes duplicate with reference +5. Moves comments/dependencies + +**Pain points:** +- Requires reading full issue text +- Subjective judgment (are they really duplicates?) +- Manual reference updates + +#### 3. Test Pollution (Low Frequency Now, High Impact) + +**Scenario:** +```bash +# Test creates 1044 issues in production DB +go test ./internal/rpc/... # Oops, no isolation + +bd list +# Shows 1044 issues with titles like "test-issue-1", "benchmark-issue-42" +``` + +**Recent occurrence:** Commits 78e8cb9, d1d3fcd (Oct 2025) + +**Current fix:** Agent manually: +1. Identifies test issues by pattern matching +2. Bulk closes with `bd close bd-1 bd-2 ... bd-1044` +3. Archives or deletes + +**Pain points:** +- Hard to distinguish test vs. real issues +- Risk of deleting real issues +- No automated recovery + +#### 4. Orphaned Dependencies (Medium Frequency) + +**Scenario:** +```bash +bd create "Implement feature X" # bd-42 +bd create "Test feature X" --depends bd-42 # bd-43 depends on bd-42 + +bd delete bd-42 # User deletes parent + +bd show bd-43 +# Depends: bd-42 (orphaned - issue doesn't exist!) +``` + +**Current fix:** Agent manually updates dependencies + +**Pain points:** +- Silent corruption (no warning on delete) +- Hard to find orphans (requires DB query) + +## Proposed Commands + +### 1. `bd resolve-conflicts` - Git Merge Conflict Resolver + +**Purpose:** Automatically resolve JSONL merge conflicts + +**Usage:** +```bash +# Detect conflicts +bd resolve-conflicts + +# Auto-resolve with AI +bd resolve-conflicts --auto + +# Manual conflict resolution +bd resolve-conflicts --interactive +``` + +**Implementation:** + +```go +// cmd/bd/resolve_conflicts.go (new file) +package main + +import ( + "bufio" + "context" + "fmt" + "os" + "strings" + + "github.com/steveyegge/beads/internal/types" +) + +type ConflictBlock struct { + HeadIssues []types.Issue + BaseIssues []types.Issue + LineStart int + LineEnd int +} + +func detectConflicts(jsonlPath string) ([]ConflictBlock, error) { + file, err := os.Open(jsonlPath) + if err != nil { + return nil, err + } + defer file.Close() + + var conflicts []ConflictBlock + var current *ConflictBlock + inConflict := false + inHead := false + lineNum := 0 + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + lineNum++ + + switch { + case strings.HasPrefix(line, "<<<<<<<"): + // Start of conflict + inConflict = true + inHead = true + current = &ConflictBlock{LineStart: lineNum} + + case strings.HasPrefix(line, "======="): + // Switch from HEAD to base + inHead = false + + case strings.HasPrefix(line, ">>>>>>>"): + // End of conflict + inConflict = false + current.LineEnd = lineNum + conflicts = append(conflicts, *current) + current = nil + + case inConflict && inHead: + // Parse issue in HEAD section + issue, err := parseIssueLine(line) + if err == nil { + current.HeadIssues = append(current.HeadIssues, issue) + } + + case inConflict && !inHead: + // Parse issue in base section + issue, err := parseIssueLine(line) + if err == nil { + current.BaseIssues = append(current.BaseIssues, issue) + } + } + } + + if scanner.Err() != nil { + return nil, scanner.Err() + } + + return conflicts, nil +} + +func resolveConflictsAuto(conflicts []ConflictBlock, useAI bool) ([]Resolution, error) { + var resolutions []Resolution + + for _, conflict := range conflicts { + if useAI { + // Use AI to determine resolution + resolution, err := resolveConflictWithAI(conflict) + if err != nil { + return nil, err + } + resolutions = append(resolutions, resolution) + } else { + // Mechanical resolution: remap duplicate IDs + resolution := resolveConflictMechanical(conflict) + resolutions = append(resolutions, resolution) + } + } + + return resolutions, nil +} + +type Resolution struct { + Action string // "remap", "merge", "keep-head", "keep-base" + OldID string + NewID string + Reason string + Merged *types.Issue // If action="merge" +} + +func resolveConflictMechanical(conflict ConflictBlock) Resolution { + // Mechanical strategy: Keep HEAD, remap base to new IDs + // This matches current auto-import collision resolution + + headIDs := make(map[string]bool) + for _, issue := range conflict.HeadIssues { + headIDs[issue.ID] = true + } + + var resolutions []Resolution + for _, issue := range conflict.BaseIssues { + if headIDs[issue.ID] { + // ID collision: remap base issue to next available ID + newID := getNextAvailableID() + resolutions = append(resolutions, Resolution{ + Action: "remap", + OldID: issue.ID, + NewID: newID, + Reason: fmt.Sprintf("ID %s exists in both branches", issue.ID), + }) + } + } + + return resolutions[0] // Simplified for example +} + +func resolveConflictWithAI(conflict ConflictBlock) (Resolution, error) { + // Call AI to analyze conflict and suggest resolution + + prompt := fmt.Sprintf(` +You are resolving a git merge conflict in a beads issue tracker JSONL file. + +HEAD issues (current branch): +%s + +BASE issues (incoming branch): +%s + +Analyze these conflicts and suggest ONE of: +1. "remap" - Issues are different, keep both but remap IDs +2. "merge" - Issues are similar, merge into one +3. "keep-head" - HEAD version is correct, discard BASE +4. "keep-base" - BASE version is correct, discard HEAD + +Respond in JSON format: +{ + "action": "remap|merge|keep-head|keep-base", + "reason": "explanation", + "merged_issue": {...} // Only if action=merge +} +`, formatIssues(conflict.HeadIssues), formatIssues(conflict.BaseIssues)) + + // Call AI (via environment-configured API) + response, err := callAIAPI(prompt) + if err != nil { + return Resolution{}, err + } + + // Parse response + var resolution Resolution + if err := json.Unmarshal([]byte(response), &resolution); err != nil { + return Resolution{}, err + } + + return resolution, nil +} + +func applyResolutions(jsonlPath string, conflicts []ConflictBlock, resolutions []Resolution) error { + // Read entire JSONL + allIssues, err := readJSONL(jsonlPath) + if err != nil { + return err + } + + // Apply resolutions + for i, resolution := range resolutions { + conflict := conflicts[i] + + switch resolution.Action { + case "remap": + // Remap IDs and update references + remapIssueID(allIssues, resolution.OldID, resolution.NewID) + + case "merge": + // Replace both with merged issue + replaceIssues(allIssues, conflict.HeadIssues, conflict.BaseIssues, resolution.Merged) + + case "keep-head": + // Remove base issues + removeIssues(allIssues, conflict.BaseIssues) + + case "keep-base": + // Remove head issues + removeIssues(allIssues, conflict.HeadIssues) + } + } + + // Write back to JSONL (atomic) + return writeJSONL(jsonlPath, allIssues) +} +``` + +**AI Integration:** + +```go +// internal/ai/client.go (new package) +package ai + +import ( + "context" + "fmt" + "os" +) + +type Client struct { + provider string // "anthropic", "openai", "ollama" + apiKey string + model string +} + +func NewClient() (*Client, error) { + provider := os.Getenv("BEADS_AI_PROVIDER") // "anthropic" (default) + apiKey := os.Getenv("BEADS_AI_API_KEY") // Required for cloud providers + model := os.Getenv("BEADS_AI_MODEL") // "claude-3-5-sonnet-20241022" (default) + + if provider == "" { + provider = "anthropic" + } + + if apiKey == "" && provider != "ollama" { + return nil, fmt.Errorf("BEADS_AI_API_KEY required for provider %s", provider) + } + + return &Client{ + provider: provider, + apiKey: apiKey, + model: model, + }, nil +} + +func (c *Client) Complete(ctx context.Context, prompt string) (string, error) { + switch c.provider { + case "anthropic": + return c.callAnthropic(ctx, prompt) + case "openai": + return c.callOpenAI(ctx, prompt) + case "ollama": + return c.callOllama(ctx, prompt) + default: + return "", fmt.Errorf("unknown AI provider: %s", c.provider) + } +} + +func (c *Client) callAnthropic(ctx context.Context, prompt string) (string, error) { + // Use anthropic-go SDK + // Implementation omitted for brevity + return "", nil +} +``` + +**Configuration:** + +```bash +# ~/.config/beads/ai.conf (optional) +BEADS_AI_PROVIDER=anthropic +BEADS_AI_API_KEY=sk-ant-... +BEADS_AI_MODEL=claude-3-5-sonnet-20241022 + +# Or use local Ollama +BEADS_AI_PROVIDER=ollama +BEADS_AI_MODEL=llama2 +``` + +**Example usage:** + +```bash +# Detect conflicts (shows summary, doesn't modify) +$ bd resolve-conflicts +Found 3 conflicts in beads.jsonl: + +Conflict 1 (lines 42-47): + HEAD: bd-42 "Add authentication" (created by alice) + BASE: bd-42 "Fix logging" (created by bob) + → Recommendation: REMAP (different issues, same ID) + +Conflict 2 (lines 103-108): + HEAD: bd-87 "Update docs for API" + BASE: bd-87 "Update docs for API v2" + → Recommendation: MERGE (similar, minor differences) + +Conflict 3 (lines 234-239): + HEAD: bd-156 "Refactor parser" + BASE: bd-156 "Refactor parser" (identical) + → Recommendation: KEEP-HEAD (identical content) + +Run 'bd resolve-conflicts --auto' to apply recommendations. +Run 'bd resolve-conflicts --interactive' to review each conflict. + +# Auto-resolve with AI +$ bd resolve-conflicts --auto --ai +Resolving 3 conflicts... +✓ Conflict 1: Remapped bd-42 (BASE) → bd-200 +✓ Conflict 2: Merged into bd-87 (combined descriptions) +✓ Conflict 3: Kept HEAD version (identical) + +Updated beads.jsonl (conflicts resolved) +Next steps: + 1. Review changes: git diff beads.jsonl + 2. Import to database: bd import + 3. Commit resolution: git add beads.jsonl && git commit + +# Interactive mode +$ bd resolve-conflicts --interactive +Conflict 1 of 3 (lines 42-47): + + HEAD: bd-42 "Add authentication" + Created: 2025-10-20 by alice + Status: in_progress + Labels: feature, security + + BASE: bd-42 "Fix logging" + Created: 2025-10-21 by bob + Status: open + Labels: bug, logging + +AI Recommendation: REMAP (different issues, same ID) +Reason: Issues have different topics (auth vs logging) and authors + +Choose action: + 1) Remap BASE to new ID (recommended) + 2) Merge into one issue + 3) Keep HEAD, discard BASE + 4) Keep BASE, discard HEAD + 5) Skip (resolve manually) + +Your choice [1-5]: 1 + +✓ Will remap BASE bd-42 → bd-200 + +Continue to next conflict? [Y/n]: +``` + +### 2. `bd find-duplicates` - AI-Powered Duplicate Detection + +**Purpose:** Find semantically duplicate issues across the database + +**Usage:** +```bash +# Find all duplicates +bd find-duplicates + +# Find duplicates with specific threshold +bd find-duplicates --threshold 0.8 + +# Auto-merge duplicates (requires confirmation) +bd find-duplicates --merge +``` + +**Implementation:** + +```go +// cmd/bd/find_duplicates.go (new file) +package main + +import ( + "context" + "fmt" + + "github.com/steveyegge/beads/internal/ai" + "github.com/steveyegge/beads/internal/storage" + "github.com/steveyegge/beads/internal/types" +) + +type DuplicateGroup struct { + Issues []*types.Issue + Similarity float64 + Reason string +} + +func findDuplicates(ctx context.Context, store storage.Storage, useAI bool, threshold float64) ([]DuplicateGroup, error) { + // Get all open issues + issues, err := store.ListIssues(ctx, storage.ListOptions{ + Status: []string{"open", "in_progress"}, + }) + if err != nil { + return nil, err + } + + if !useAI { + // Mechanical approach: exact title match + return findDuplicatesMechanical(issues), nil + } + + // AI approach: semantic similarity + return findDuplicatesWithAI(ctx, issues, threshold) +} + +func findDuplicatesMechanical(issues []*types.Issue) []DuplicateGroup { + // Group by normalized title + titleMap := make(map[string][]*types.Issue) + + for _, issue := range issues { + normalized := normalizeTitle(issue.Title) + titleMap[normalized] = append(titleMap[normalized], issue) + } + + var groups []DuplicateGroup + for _, group := range titleMap { + if len(group) > 1 { + groups = append(groups, DuplicateGroup{ + Issues: group, + Similarity: 1.0, // Exact match + Reason: "Identical titles", + }) + } + } + + return groups +} + +func findDuplicatesWithAI(ctx context.Context, issues []*types.Issue, threshold float64) ([]DuplicateGroup, error) { + aiClient, err := ai.NewClient() + if err != nil { + return nil, fmt.Errorf("AI client unavailable: %v (set BEADS_AI_API_KEY)", err) + } + + var groups []DuplicateGroup + + // Compare all pairs (N^2, but issues typically <1000) + for i := 0; i < len(issues); i++ { + for j := i + 1; j < len(issues); j++ { + similarity, reason, err := compareIssues(ctx, aiClient, issues[i], issues[j]) + if err != nil { + continue // Skip on error + } + + if similarity >= threshold { + groups = append(groups, DuplicateGroup{ + Issues: []*types.Issue{issues[i], issues[j]}, + Similarity: similarity, + Reason: reason, + }) + } + } + } + + return groups, nil +} + +func compareIssues(ctx context.Context, client *ai.Client, issue1, issue2 *types.Issue) (float64, string, error) { + prompt := fmt.Sprintf(` +Compare these two issues and determine if they are duplicates. + +Issue 1: %s +Title: %s +Description: %s +Labels: %v +Status: %s + +Issue 2: %s +Title: %s +Description: %s +Labels: %v +Status: %s + +Respond in JSON: +{ + "similarity": 0.0-1.0, + "reason": "explanation", + "is_duplicate": true/false +} +`, issue1.ID, issue1.Title, issue1.Description, issue1.Labels, issue1.Status, + issue2.ID, issue2.Title, issue2.Description, issue2.Labels, issue2.Status) + + response, err := client.Complete(ctx, prompt) + if err != nil { + return 0, "", err + } + + var result struct { + Similarity float64 `json:"similarity"` + Reason string `json:"reason"` + IsDuplicate bool `json:"is_duplicate"` + } + + if err := json.Unmarshal([]byte(response), &result); err != nil { + return 0, "", err + } + + return result.Similarity, result.Reason, nil +} +``` + +**Optimization for large databases:** + +For databases with >1000 issues, N^2 comparison is too slow. Use **embedding-based similarity**: + +```go +// Use OpenAI embeddings or local model +func findDuplicatesWithEmbeddings(ctx context.Context, issues []*types.Issue, threshold float64) ([]DuplicateGroup, error) { + // 1. Generate embeddings for all issues + embeddings := make([][]float64, len(issues)) + for i, issue := range issues { + text := fmt.Sprintf("%s\n%s", issue.Title, issue.Description) + embedding, err := generateEmbedding(ctx, text) + if err != nil { + return nil, err + } + embeddings[i] = embedding + } + + // 2. Find similar pairs using cosine similarity + var groups []DuplicateGroup + for i := 0; i < len(embeddings); i++ { + for j := i + 1; j < len(embeddings); j++ { + similarity := cosineSimilarity(embeddings[i], embeddings[j]) + if similarity >= threshold { + groups = append(groups, DuplicateGroup{ + Issues: []*types.Issue{issues[i], issues[j]}, + Similarity: similarity, + Reason: "Semantic similarity via embeddings", + }) + } + } + } + + return groups, nil +} + +func generateEmbedding(ctx context.Context, text string) ([]float64, error) { + // Use OpenAI text-embedding-3-small or local model + // Returns 1536-dimensional vector + return nil, nil +} + +func cosineSimilarity(a, b []float64) float64 { + var dotProduct, normA, normB float64 + for i := range a { + dotProduct += a[i] * b[i] + normA += a[i] * a[i] + normB += b[i] * b[i] + } + return dotProduct / (math.Sqrt(normA) * math.Sqrt(normB)) +} +``` + +**Example usage:** + +```bash +# Find duplicates (mechanical, no AI) +$ bd find-duplicates --no-ai +Found 2 potential duplicate groups: + +Group 1 (Similarity: 100%): + bd-42: "Fix memory leak in parser" + bd-87: "Fix memory leak in parser" + Reason: Identical titles + +Group 2 (Similarity: 100%): + bd-103: "Update documentation" + bd-145: "Update documentation" + Reason: Identical titles + +# Find duplicates with AI (semantic) +$ bd find-duplicates --ai --threshold 0.75 +Found 4 potential duplicate groups: + +Group 1 (Similarity: 95%): + bd-42: "Fix memory leak in parser" + bd-87: "Parser memory leak needs fixing" + Reason: Same issue described differently + +Group 2 (Similarity: 88%): + bd-103: "Update API documentation" + bd-145: "Document new API endpoints" + Reason: Both about API docs, overlapping scope + +Group 3 (Similarity: 82%): + bd-200: "Optimize database queries" + bd-234: "Improve query performance" + Reason: Same goal (performance), different wording + +Group 4 (Similarity: 76%): + bd-301: "Add user authentication" + bd-312: "Implement login system" + Reason: Authentication and login are related features + +# Merge duplicates interactively +$ bd find-duplicates --merge +Found 2 duplicate groups. Review each: + +Group 1 (Similarity: 95%): + bd-42: "Fix memory leak in parser" (alice, 2025-10-20) + Status: in_progress + Labels: bug, performance + Comments: 3 + + bd-87: "Parser memory leak needs fixing" (bob, 2025-10-21) + Status: open + Labels: bug + Comments: 1 + +Merge these issues? [y/N] y + +Choose canonical issue: + 1) bd-42 (more activity, earlier) + 2) bd-87 +Your choice [1-2]: 1 + +✓ Merged bd-87 → bd-42 + - Moved 1 comment from bd-87 + - Added note: "Duplicate of bd-42" + - Closed bd-87 with reason: "duplicate" + +Continue to next group? [Y/n]: +``` + +### 3. `bd detect-pollution` - Test Issue Detector + +**Purpose:** Identify and clean up test issues that leaked into production database + +**Usage:** +```bash +# Detect test issues +bd detect-pollution + +# Auto-delete with confirmation +bd detect-pollution --clean + +# Export pollution report +bd detect-pollution --report pollution.json +``` + +**Implementation:** + +```go +// cmd/bd/detect_pollution.go (new file) +package main + +import ( + "context" + "regexp" + "strings" + + "github.com/steveyegge/beads/internal/storage" + "github.com/steveyegge/beads/internal/types" +) + +type PollutionIndicator struct { + Pattern string + Weight float64 +} + +var pollutionPatterns = []PollutionIndicator{ + {Pattern: `^test[-_]`, Weight: 0.9}, // "test-issue-1" + {Pattern: `^benchmark[-_]`, Weight: 0.95}, // "benchmark-issue-42" + {Pattern: `^(?i)test\s+issue`, Weight: 0.85}, // "Test Issue 123" + {Pattern: `^(?i)dummy`, Weight: 0.8}, // "Dummy issue" + {Pattern: `^(?i)sample`, Weight: 0.7}, // "Sample issue" + {Pattern: `^(?i)todo.*test`, Weight: 0.75}, // "TODO test something" + {Pattern: `^issue\s+\d+$`, Weight: 0.6}, // "issue 123" + {Pattern: `^[A-Z]{4,}-\d+$`, Weight: 0.5}, // "JIRA-123" (might be import) +} + +func detectPollution(ctx context.Context, store storage.Storage, useAI bool) ([]*types.Issue, error) { + allIssues, err := store.ListIssues(ctx, storage.ListOptions{}) + if err != nil { + return nil, err + } + + if !useAI { + // Mechanical approach: pattern matching + return detectPollutionMechanical(allIssues), nil + } + + // AI approach: semantic classification + return detectPollutionWithAI(ctx, allIssues) +} + +func detectPollutionMechanical(issues []*types.Issue) []*types.Issue { + var polluted []*types.Issue + + for _, issue := range issues { + score := 0.0 + + // Check title against patterns + for _, indicator := range pollutionPatterns { + matched, _ := regexp.MatchString(indicator.Pattern, issue.Title) + if matched { + score = max(score, indicator.Weight) + } + } + + // Additional heuristics + if len(issue.Title) < 10 { + score += 0.2 // Very short titles suspicious + } + + if issue.Description == "" || issue.Description == issue.Title { + score += 0.1 // No description + } + + if strings.Count(issue.Title, "test") > 1 { + score += 0.2 // Multiple "test" occurrences + } + + // Threshold: 0.7 + if score >= 0.7 { + polluted = append(polluted, issue) + } + } + + return polluted +} + +func detectPollutionWithAI(ctx context.Context, issues []*types.Issue) ([]*types.Issue, error) { + aiClient, err := ai.NewClient() + if err != nil { + return nil, err + } + + // Batch issues for efficiency (classify 50 at a time) + batchSize := 50 + var polluted []*types.Issue + + for i := 0; i < len(issues); i += batchSize { + end := min(i+batchSize, len(issues)) + batch := issues[i:end] + + prompt := buildPollutionPrompt(batch) + response, err := aiClient.Complete(ctx, prompt) + if err != nil { + return nil, err + } + + // Parse response: list of issue IDs classified as test pollution + pollutedIDs, err := parsePollutionResponse(response) + if err != nil { + continue + } + + for _, issue := range batch { + for _, id := range pollutedIDs { + if issue.ID == id { + polluted = append(polluted, issue) + } + } + } + } + + return polluted, nil +} + +func buildPollutionPrompt(issues []*types.Issue) string { + var builder strings.Builder + builder.WriteString("Identify test pollution in this issue list. Test issues have patterns like:\n") + builder.WriteString("- Titles starting with 'test', 'benchmark', 'sample'\n") + builder.WriteString("- Sequential numbering (test-1, test-2, ...)\n") + builder.WriteString("- Generic descriptions or no description\n") + builder.WriteString("- Created in rapid succession\n\n") + builder.WriteString("Issues:\n") + + for _, issue := range issues { + fmt.Fprintf(&builder, "%s: %s (created: %s)\n", issue.ID, issue.Title, issue.CreatedAt) + } + + builder.WriteString("\nRespond with JSON list of polluted issue IDs: {\"polluted\": [\"bd-1\", \"bd-2\"]}") + return builder.String() +} +``` + +**Example usage:** + +```bash +# Detect pollution +$ bd detect-pollution +Scanning 523 issues for test pollution... + +Found 47 potential test issues: + +High Confidence (score ≥ 0.9): + bd-100: "test-issue-1" + bd-101: "test-issue-2" + ... + bd-146: "benchmark-create-47" + (Total: 45 issues) + +Medium Confidence (score 0.7-0.9): + bd-200: "Quick test" + bd-301: "sample issue for testing" + (Total: 2 issues) + +Recommendation: Review and clean up these issues. +Run 'bd detect-pollution --clean' to delete them (with confirmation). + +# Clean up +$ bd detect-pollution --clean +Found 47 test issues. Delete them? [y/N] y + +Deleting 47 issues... +✓ Deleted bd-100 through bd-146 +✓ Deleted bd-200, bd-301 + +Cleanup complete. Exported deleted issues to .beads/pollution-backup.jsonl +(Run 'bd import .beads/pollution-backup.jsonl' to restore if needed) +``` + +### 4. `bd repair-deps` - Orphaned Dependency Cleaner + +**Purpose:** Find and fix orphaned dependency references + +**Usage:** +```bash +# Find orphans +bd repair-deps + +# Auto-fix (remove orphaned references) +bd repair-deps --fix + +# Interactive +bd repair-deps --interactive +``` + +**Implementation:** + +```go +// cmd/bd/repair_deps.go (new file) +package main + +import ( + "context" + "fmt" + + "github.com/steveyegge/beads/internal/storage" + "github.com/steveyegge/beads/internal/types" +) + +type OrphanedDependency struct { + Issue *types.Issue + OrphanedID string +} + +func findOrphanedDeps(ctx context.Context, store storage.Storage) ([]OrphanedDependency, error) { + allIssues, err := store.ListIssues(ctx, storage.ListOptions{}) + if err != nil { + return nil, err + } + + // Build ID existence map + existingIDs := make(map[string]bool) + for _, issue := range allIssues { + existingIDs[issue.ID] = true + } + + // Find orphans + var orphaned []OrphanedDependency + for _, issue := range allIssues { + for _, depID := range issue.DependsOn { + if !existingIDs[depID] { + orphaned = append(orphaned, OrphanedDependency{ + Issue: issue, + OrphanedID: depID, + }) + } + } + } + + return orphaned, nil +} + +func repairOrphanedDeps(ctx context.Context, store storage.Storage, orphaned []OrphanedDependency, autoFix bool) error { + for _, o := range orphaned { + if autoFix { + // Remove orphaned dependency + newDeps := removeString(o.Issue.DependsOn, o.OrphanedID) + o.Issue.DependsOn = newDeps + + if err := store.UpdateIssue(ctx, o.Issue); err != nil { + return err + } + + fmt.Printf("✓ Removed orphaned dependency %s from %s\n", o.OrphanedID, o.Issue.ID) + } else { + fmt.Printf("Found orphan: %s depends on non-existent %s\n", o.Issue.ID, o.OrphanedID) + } + } + + return nil +} +``` + +**Example usage:** + +```bash +# Find orphaned deps +$ bd repair-deps +Scanning dependencies... + +Found 3 orphaned dependencies: + + bd-42: depends on bd-10 (deleted) + bd-87: depends on bd-25 (deleted) + bd-103: depends on bd-25 (deleted) + +Run 'bd repair-deps --fix' to remove these references. + +# Auto-fix +$ bd repair-deps --fix +✓ Removed bd-10 from bd-42 dependencies +✓ Removed bd-25 from bd-87 dependencies +✓ Removed bd-25 from bd-103 dependencies + +Repaired 3 issues. +``` + +### 5. `bd validate` - Comprehensive Health Check + +**Purpose:** Run all validation checks in one command + +**Usage:** +```bash +# Run all checks +bd validate + +# Auto-fix all issues +bd validate --fix-all + +# Specific checks +bd validate --checks=duplicates,orphans,pollution +``` + +**Implementation:** + +```go +// cmd/bd/validate.go (new file) +package main + +import ( + "context" + "fmt" + + "github.com/steveyegge/beads/internal/storage" +) + +func runValidation(ctx context.Context, store storage.Storage, checks []string, autoFix bool) error { + results := ValidationResults{} + + for _, check := range checks { + switch check { + case "duplicates": + groups, err := findDuplicates(ctx, store, false, 1.0) + if err != nil { + return err + } + results.Duplicates = len(groups) + + case "orphans": + orphaned, err := findOrphanedDeps(ctx, store) + if err != nil { + return err + } + results.Orphans = len(orphaned) + if autoFix { + repairOrphanedDeps(ctx, store, orphaned, true) + } + + case "pollution": + polluted, err := detectPollution(ctx, store, false) + if err != nil { + return err + } + results.Pollution = len(polluted) + + case "conflicts": + jsonlPath := findJSONLPath() + conflicts, err := detectConflicts(jsonlPath) + if err != nil { + return err + } + results.Conflicts = len(conflicts) + } + } + + results.Print() + return nil +} + +type ValidationResults struct { + Duplicates int + Orphans int + Pollution int + Conflicts int +} + +func (r ValidationResults) Print() { + fmt.Println("\nValidation Results:") + fmt.Println("===================") + fmt.Printf("Duplicates: %d\n", r.Duplicates) + fmt.Printf("Orphans: %d\n", r.Orphans) + fmt.Printf("Pollution: %d\n", r.Pollution) + fmt.Printf("Conflicts: %d\n", r.Conflicts) + + total := r.Duplicates + r.Orphans + r.Pollution + r.Conflicts + if total == 0 { + fmt.Println("\n✓ Database is healthy!") + } else { + fmt.Printf("\n⚠ Found %d issues to fix\n", total) + } +} +``` + +**Example usage:** + +```bash +$ bd validate +Running validation checks... + +✓ Checking for duplicates... found 2 groups +✓ Checking for orphaned dependencies... found 3 +✓ Checking for test pollution... found 0 +✓ Checking for git conflicts... found 1 + +Validation Results: +=================== +Duplicates: 2 +Orphans: 3 +Pollution: 0 +Conflicts: 1 + +⚠ Found 6 issues to fix + +Recommendations: + - Run 'bd find-duplicates --merge' to handle duplicates + - Run 'bd repair-deps --fix' to remove orphaned dependencies + - Run 'bd resolve-conflicts' to resolve git conflicts + +$ bd validate --fix-all +Running validation with auto-fix... +✓ Fixed 3 orphaned dependencies +✓ Resolved 1 git conflict (mechanical) + +2 duplicate groups require manual review. +Run 'bd find-duplicates --merge' to handle them interactively. +``` + +## Agent Integration + +### MCP Server Functions + +Add these as MCP functions for easy agent access: + +```python +# integrations/beads-mcp/src/beads_mcp/server.py + +@server.call_tool() +async def beads_resolve_conflicts(auto: bool = False, ai: bool = True) -> list: + """Resolve git merge conflicts in JSONL file""" + result = subprocess.run( + ["bd", "resolve-conflicts"] + + (["--auto"] if auto else []) + + (["--ai"] if ai else []) + + ["--json"], + capture_output=True, + text=True + ) + return json.loads(result.stdout) + +@server.call_tool() +async def beads_find_duplicates(ai: bool = True, threshold: float = 0.8) -> list: + """Find duplicate issues using AI or mechanical matching""" + result = subprocess.run( + ["bd", "find-duplicates"] + + (["--ai"] if ai else ["--no-ai"]) + + ["--threshold", str(threshold), "--json"], + capture_output=True, + text=True + ) + return json.loads(result.stdout) + +@server.call_tool() +async def beads_detect_pollution() -> list: + """Detect test issues that leaked into production""" + result = subprocess.run( + ["bd", "detect-pollution", "--json"], + capture_output=True, + text=True + ) + return json.loads(result.stdout) + +@server.call_tool() +async def beads_validate(fix_all: bool = False) -> dict: + """Run all validation checks""" + result = subprocess.run( + ["bd", "validate"] + + (["--fix-all"] if fix_all else []) + + ["--json"], + capture_output=True, + text=True + ) + return json.loads(result.stdout) +``` + +### Agent Workflow + +**Typical agent repair workflow:** + +``` +1. Agent notices issue (e.g., git merge conflict error) +2. Agent calls: mcp__beads__resolve_conflicts(auto=True, ai=True) +3. If successful: + - Agent reports: "Resolved 3 conflicts, remapped 1 ID" + - Agent continues work +4. If fails: + - Agent calls: mcp__beads__resolve_conflicts() for report + - Agent asks user for guidance +``` + +**Proactive validation:** + +``` +At session start, agent can: +1. Call: mcp__beads__validate() +2. If issues found: + - Report to user: "Found 3 orphaned deps and 2 duplicates" + - Ask: "Should I fix these?" +3. If user approves: + - Call: mcp__beads__validate(fix_all=True) + - Report: "Fixed 3 orphans, 2 duplicates need manual review" +``` + +## Cost Considerations + +### AI API Costs + +**Claude 3.5 Sonnet pricing (2025):** +- Input: $3.00 / 1M tokens +- Output: $15.00 / 1M tokens + +**Typical usage:** + +1. **Resolve conflicts** (~500 tokens per conflict) + - Cost: ~$0.0075 per conflict + - 10 conflicts/day = $0.075/day = $2.25/month + +2. **Find duplicates** (~200 tokens per comparison) + - Cost: ~$0.003 per comparison + - 100 issues = 4,950 comparisons = $15/run + - **Too expensive!** Use embeddings instead + +3. **Embeddings approach** (text-embedding-3-small) + - $0.02 / 1M tokens + - 100 issues × 100 tokens = 10K tokens = $0.0002/run + - **Much cheaper!** + +**Recommendations:** +- Use AI for conflict resolution (low frequency, high value) +- Use embeddings for duplicate detection (high frequency, needs scale) +- Use mechanical checks by default, AI as opt-in + +### Local AI Option + +For users who want to avoid API costs: + +```bash +# Use Ollama (free, local) +BEADS_AI_PROVIDER=ollama +BEADS_AI_MODEL=llama3.2 + +# Or use local embedding model +BEADS_EMBEDDING_PROVIDER=local +BEADS_EMBEDDING_MODEL=all-MiniLM-L6-v2 # 384-dimensional, fast +``` + +## Implementation Roadmap + +### Phase 1: Mechanical Commands (2-3 weeks) +- [ ] `bd repair-deps` (orphaned dependency cleaner) +- [ ] `bd detect-pollution` (pattern-based test detection) +- [ ] `bd resolve-conflicts` (mechanical ID remapping) +- [ ] `bd validate` (run all checks) + +### Phase 2: AI Integration (2-3 weeks) +- [ ] Add `internal/ai` package +- [ ] Implement Anthropic, OpenAI, Ollama providers +- [ ] Add `--ai` flag to commands +- [ ] Test with real conflicts/duplicates + +### Phase 3: Embeddings (1-2 weeks) +- [ ] Add embedding generation +- [ ] Implement cosine similarity search +- [ ] Optimize for large databases (>1K issues) +- [ ] Benchmark performance + +### Phase 4: MCP Integration (1 week) +- [ ] Add MCP functions for all repair commands +- [ ] Update beads-mcp documentation +- [ ] Add examples to AGENTS.md + +### Phase 5: Polish (1 week) +- [ ] Add `--json` output for all commands +- [ ] Improve error messages +- [ ] Add progress indicators for slow operations +- [ ] Write comprehensive tests + +**Total timeline: 7-10 weeks** + +## Success Metrics + +### Quantitative +- ✅ Agent repair time reduced by >50% +- ✅ Manual interventions reduced by >70% +- ✅ Conflict resolution time <30 seconds +- ✅ Duplicate detection accuracy >90% + +### Qualitative +- ✅ Agents report fewer "stuck" situations +- ✅ Users spend less time on database maintenance +- ✅ Fewer support requests about database issues + +## Open Questions + +1. **Should repair commands auto-run in daemon?** + - Recommendation: No, too risky. On-demand only. + +2. **Should agents proactively run validation?** + - Recommendation: Yes, at session start (with user notification) + +3. **What AI provider should be default?** + - Recommendation: None (mechanical by default), user opts in + +4. **Should duplicate detection be continuous?** + - Recommendation: No, run on-demand or weekly scheduled + +5. **How to handle false positives in pollution detection?** + - Recommendation: Always confirm before deleting, backup to JSONL + +## Conclusion + +Repair commands address the **root cause of agent repair burden**: lack of specialized tools for common maintenance tasks. By providing `bd resolve-conflicts`, `bd find-duplicates`, `bd detect-pollution`, and `bd validate`, we: + +✅ Reduce agent time from 5-10 minutes to <30 seconds per repair +✅ Provide consistent repair logic across sessions +✅ Enable proactive validation instead of reactive fixing +✅ Allow AI assistance where valuable (conflicts, duplicates) while keeping mechanical checks fast + +Combined with event-driven daemon (instant feedback), these tools should significantly reduce the "not as much in the background as I'd like" pain.