From e00f013bda87249491eb3df66cdcf2183385fa31 Mon Sep 17 00:00:00 2001 From: beads/crew/wickham Date: Tue, 20 Jan 2026 19:07:13 -0800 Subject: [PATCH] feat(activity): use fsnotify for real-time feed (hq-ew1mbr.17) Replace polling with filesystem watching for near-instant wake-up (<50ms vs 250ms avg). Watches .beads/dolt/.dolt/noms for Dolt commits. Falls back to polling if fsnotify unavailable. Co-Authored-By: Claude Opus 4.5 --- cmd/bd/activity.go | 88 +++++++++++++- cmd/bd/activity_watcher.go | 234 +++++++++++++++++++++++++++++++++++++ 2 files changed, 317 insertions(+), 5 deletions(-) create mode 100644 cmd/bd/activity_watcher.go diff --git a/cmd/bd/activity.go b/cmd/bd/activity.go index 1c604978..2acac5d6 100644 --- a/cmd/bd/activity.go +++ b/cmd/bd/activity.go @@ -152,7 +152,8 @@ func runActivityOnce(sinceTime time.Time) { } } -// runActivityFollow streams events in real-time +// runActivityFollow streams events in real-time using filesystem watching. +// Falls back to polling if fsnotify is not available. func runActivityFollow(sinceTime time.Time) { // Start from now if no --since specified lastPoll := time.Now().Add(-1 * time.Second) @@ -181,9 +182,19 @@ func runActivityFollow(sinceTime time.Time) { } } - // Poll for new events - ticker := time.NewTicker(activityInterval) - defer ticker.Stop() + // Create filesystem watcher for near-instant wake-up + // Falls back to polling internally if fsnotify fails + beadsDir := filepath.Dir(dbPath) + watcher, err := NewActivityWatcher(beadsDir, activityInterval) + if err != nil { + // Watcher creation failed entirely - fall back to legacy polling + runActivityFollowPolling(sinceTime, lastPoll) + return + } + defer watcher.Close() + + // Start watching + watcher.Start(rootCtx) // Track consecutive failures for error reporting consecutiveFailures := 0 @@ -194,7 +205,11 @@ func runActivityFollow(sinceTime time.Time) { select { case <-rootCtx.Done(): return - case <-ticker.C: + case _, ok := <-watcher.Events(): + if !ok { + return // Watcher closed + } + newEvents, err := fetchMutations(lastPoll) if err != nil { consecutiveFailures++ @@ -246,6 +261,69 @@ func runActivityFollow(sinceTime time.Time) { } } +// runActivityFollowPolling is the legacy polling-based follow mode. +// Used as fallback when ActivityWatcher cannot be created. +func runActivityFollowPolling(sinceTime time.Time, lastPoll time.Time) { + ticker := time.NewTicker(activityInterval) + defer ticker.Stop() + + consecutiveFailures := 0 + const failureWarningThreshold = 5 + lastWarningTime := time.Time{} + + for { + select { + case <-rootCtx.Done(): + return + case <-ticker.C: + newEvents, err := fetchMutations(lastPoll) + if err != nil { + consecutiveFailures++ + if consecutiveFailures >= failureWarningThreshold { + if time.Since(lastWarningTime) >= 30*time.Second { + if jsonOutput { + errorEvent := map[string]interface{}{ + "type": "error", + "message": fmt.Sprintf("daemon unreachable (%d failures)", consecutiveFailures), + "timestamp": time.Now().Format(time.RFC3339), + } + data, _ := json.Marshal(errorEvent) + fmt.Fprintln(os.Stderr, string(data)) + } else { + timestamp := time.Now().Format("15:04:05") + fmt.Fprintf(os.Stderr, "[%s] %s daemon unreachable (%d consecutive failures)\n", + timestamp, ui.RenderWarn("!"), consecutiveFailures) + } + lastWarningTime = time.Now() + } + } + continue + } + + if consecutiveFailures > 0 { + if consecutiveFailures >= failureWarningThreshold && !jsonOutput { + timestamp := time.Now().Format("15:04:05") + fmt.Fprintf(os.Stderr, "[%s] %s daemon reconnected\n", timestamp, ui.RenderPass("✓")) + } + consecutiveFailures = 0 + } + + newEvents = filterEvents(newEvents) + for _, e := range newEvents { + if jsonOutput { + data, _ := json.Marshal(formatEvent(e)) + fmt.Println(string(data)) + } else { + printEvent(e) + } + if e.Timestamp.After(lastPoll) { + lastPoll = e.Timestamp + } + } + } + } +} + // fetchMutations retrieves mutations from the daemon func fetchMutations(since time.Time) ([]rpc.MutationEvent, error) { var sinceMillis int64 diff --git a/cmd/bd/activity_watcher.go b/cmd/bd/activity_watcher.go new file mode 100644 index 00000000..ac994a81 --- /dev/null +++ b/cmd/bd/activity_watcher.go @@ -0,0 +1,234 @@ +package main + +import ( + "context" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +// ActivityWatcher monitors the beads directory for changes using filesystem events. +// Falls back to polling if fsnotify fails (some filesystems don't support it). +type ActivityWatcher struct { + watcher *fsnotify.Watcher + watchPaths []string // Paths being watched + pollingMode bool // True if using polling fallback + pollInterval time.Duration + events chan struct{} // Sends wake-up signals on changes + cancel context.CancelFunc + wg sync.WaitGroup + mu sync.Mutex + + // Polling state + lastModTimes map[string]time.Time +} + +// NewActivityWatcher creates a watcher for activity feed updates. +// Watches the dolt noms directory for commits, falling back to polling if fsnotify fails. +// The beadsDir should be the .beads directory path. +// The pollInterval is used for polling fallback mode. +func NewActivityWatcher(beadsDir string, pollInterval time.Duration) (*ActivityWatcher, error) { + aw := &ActivityWatcher{ + pollInterval: pollInterval, + events: make(chan struct{}, 1), // Buffered to avoid blocking + lastModTimes: make(map[string]time.Time), + } + + // Determine watch paths - prefer dolt noms directory if it exists + doltNomsPath := filepath.Join(beadsDir, "dolt", ".dolt", "noms") + doltPath := filepath.Join(beadsDir, "dolt", ".dolt") + jsonlPath := filepath.Join(beadsDir, "issues.jsonl") + + // Build list of paths to watch (in priority order) + var watchPaths []string + if stat, err := os.Stat(doltNomsPath); err == nil && stat.IsDir() { + // Watch dolt noms directory for commits + watchPaths = append(watchPaths, doltNomsPath) + } else if stat, err := os.Stat(doltPath); err == nil && stat.IsDir() { + // Fallback to .dolt directory + watchPaths = append(watchPaths, doltPath) + } + // Also watch JSONL for non-dolt or hybrid setups + if _, err := os.Stat(jsonlPath); err == nil { + watchPaths = append(watchPaths, jsonlPath) + } + // Watch the beads dir itself as last resort + if len(watchPaths) == 0 { + watchPaths = append(watchPaths, beadsDir) + } + + aw.watchPaths = watchPaths + + // Initialize modification times for polling + for _, p := range watchPaths { + if stat, err := os.Stat(p); err == nil { + aw.lastModTimes[p] = stat.ModTime() + } + } + + // Try to create fsnotify watcher + watcher, err := fsnotify.NewWatcher() + if err != nil { + // Fall back to polling mode + aw.pollingMode = true + return aw, nil + } + + // Add watches for each path + watchedAny := false + for _, p := range watchPaths { + if err := watcher.Add(p); err != nil { + // Log but continue - some paths may not be watchable + continue + } + watchedAny = true + } + + if !watchedAny { + // No paths could be watched, fall back to polling + _ = watcher.Close() + aw.pollingMode = true + return aw, nil + } + + aw.watcher = watcher + return aw, nil +} + +// Events returns the channel that receives wake-up signals when changes are detected. +// The channel sends an empty struct for each detected change (debounced). +func (aw *ActivityWatcher) Events() <-chan struct{} { + return aw.events +} + +// IsPolling returns true if the watcher is using polling fallback. +func (aw *ActivityWatcher) IsPolling() bool { + return aw.pollingMode +} + +// Start begins monitoring for changes. +// Returns immediately, monitoring happens in background goroutine. +func (aw *ActivityWatcher) Start(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + aw.cancel = cancel + + if aw.pollingMode { + aw.startPolling(ctx) + } else { + aw.startFSWatch(ctx) + } +} + +// startFSWatch starts fsnotify-based watching. +func (aw *ActivityWatcher) startFSWatch(ctx context.Context) { + aw.wg.Add(1) + go func() { + defer aw.wg.Done() + + // Debounce: don't send more than one event per 50ms + var lastEvent time.Time + debounceWindow := 50 * time.Millisecond + + for { + select { + case event, ok := <-aw.watcher.Events: + if !ok { + return + } + + // Only trigger on write events + if event.Op&fsnotify.Write == 0 && event.Op&fsnotify.Create == 0 { + continue + } + + // Debounce rapid events + now := time.Now() + if now.Sub(lastEvent) < debounceWindow { + continue + } + lastEvent = now + + // Send non-blocking wake-up signal + select { + case aw.events <- struct{}{}: + default: + // Channel already has a pending event + } + + case _, ok := <-aw.watcher.Errors: + if !ok { + return + } + // Log errors but continue watching + + case <-ctx.Done(): + return + } + } + }() +} + +// startPolling starts polling-based change detection. +func (aw *ActivityWatcher) startPolling(ctx context.Context) { + aw.wg.Add(1) + go func() { + defer aw.wg.Done() + + ticker := time.NewTicker(aw.pollInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if aw.checkForChanges() { + // Send non-blocking wake-up signal + select { + case aw.events <- struct{}{}: + default: + } + } + + case <-ctx.Done(): + return + } + } + }() +} + +// checkForChanges checks if any watched paths have been modified. +func (aw *ActivityWatcher) checkForChanges() bool { + aw.mu.Lock() + defer aw.mu.Unlock() + + changed := false + for _, p := range aw.watchPaths { + stat, err := os.Stat(p) + if err != nil { + continue + } + + lastMod, exists := aw.lastModTimes[p] + if !exists || !stat.ModTime().Equal(lastMod) { + aw.lastModTimes[p] = stat.ModTime() + changed = true + } + } + return changed +} + +// Close stops the watcher and releases resources. +func (aw *ActivityWatcher) Close() error { + if aw.cancel != nil { + aw.cancel() + } + aw.wg.Wait() + close(aw.events) + if aw.watcher != nil { + return aw.watcher.Close() + } + return nil +} +