Files
beads/cmd/bd/daemon_watcher.go
Steve Yegge 731f40da34 Fix FileWatcher robustness issues (bd-71)
- Fix Debouncer race condition with sequence numbers to prevent double-fire
- Add parent directory watch to catch file creates/renames
- Add .git/HEAD watch for branch change detection
- Implement retry/backoff (50-400ms) for re-establishing JSONL watch
- Handle Create/Chmod events in addition to Write
- Add .git/HEAD polling in polling mode
- All 18 debouncer and watcher tests pass

Amp-Thread-ID: https://ampcode.com/threads/T-4029d643-b4b4-4d3b-bd85-74461f78cd7f
Co-authored-by: Amp <amp@ampcode.com>
2025-10-29 22:55:13 -07:00

306 lines
9.0 KiB
Go

package main
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
)
// FileWatcher monitors JSONL and git ref changes using filesystem events or polling.
type FileWatcher struct {
watcher *fsnotify.Watcher
debouncer *Debouncer
jsonlPath string
parentDir string
pollingMode bool
lastModTime time.Time
lastExists bool
lastSize int64
pollInterval time.Duration
gitRefsPath string
gitHeadPath string
lastHeadModTime time.Time
lastHeadExists bool
cancel context.CancelFunc
}
// NewFileWatcher creates a file watcher for the given JSONL path.
// onChanged is called when the file or git refs change, after debouncing.
// Falls back to polling mode if fsnotify fails (controlled by BEADS_WATCHER_FALLBACK env var).
func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
fw := &FileWatcher{
jsonlPath: jsonlPath,
parentDir: filepath.Dir(jsonlPath),
debouncer: NewDebouncer(500*time.Millisecond, onChanged),
pollInterval: 5 * time.Second,
}
// Get initial file state for polling fallback
if stat, err := os.Stat(jsonlPath); err == nil {
fw.lastModTime = stat.ModTime()
fw.lastExists = true
fw.lastSize = stat.Size()
}
// Check if fallback is disabled
fallbackEnv := os.Getenv("BEADS_WATCHER_FALLBACK")
fallbackDisabled := fallbackEnv == "false" || fallbackEnv == "0"
// Store git paths for filtering
gitDir := filepath.Join(fw.parentDir, "..", ".git")
fw.gitRefsPath = filepath.Join(gitDir, "refs", "heads")
fw.gitHeadPath = filepath.Join(gitDir, "HEAD")
// Get initial git HEAD state for polling
if stat, err := os.Stat(fw.gitHeadPath); err == nil {
fw.lastHeadModTime = stat.ModTime()
fw.lastHeadExists = true
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
if fallbackDisabled {
return nil, fmt.Errorf("fsnotify.NewWatcher() failed and BEADS_WATCHER_FALLBACK is disabled: %w", err)
}
// Fall back to polling mode
fmt.Fprintf(os.Stderr, "Warning: fsnotify.NewWatcher() failed (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
fw.pollingMode = true
return fw, nil
}
fw.watcher = watcher
// Watch the parent directory (catches creates/renames)
if err := watcher.Add(fw.parentDir); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to watch parent directory %s: %v\n", fw.parentDir, err)
}
// Watch the JSONL file (may not exist yet)
if err := watcher.Add(jsonlPath); err != nil {
if os.IsNotExist(err) {
// File doesn't exist yet - rely on parent dir watch
fmt.Fprintf(os.Stderr, "Info: JSONL file %s doesn't exist yet, watching parent directory\n", jsonlPath)
} else {
watcher.Close()
if fallbackDisabled {
return nil, fmt.Errorf("failed to watch JSONL and BEADS_WATCHER_FALLBACK is disabled: %w", err)
}
// Fall back to polling mode
fmt.Fprintf(os.Stderr, "Warning: failed to watch JSONL (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
fw.pollingMode = true
fw.watcher = nil
return fw, nil
}
}
// Also watch .git/refs/heads and .git/HEAD for branch changes (best effort)
_ = watcher.Add(fw.gitRefsPath) // Ignore error - not all setups have this
_ = watcher.Add(fw.gitHeadPath) // Ignore error - not all setups have this
return fw, nil
}
// Start begins monitoring filesystem events or polling.
// Runs in background goroutine until context is canceled.
// Should only be called once per FileWatcher instance.
func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
// Create internal cancel so Close can stop goroutines
ctx, cancel := context.WithCancel(ctx)
fw.cancel = cancel
if fw.pollingMode {
fw.startPolling(ctx, log)
return
}
go func() {
jsonlBase := filepath.Base(fw.jsonlPath)
for {
select {
case event, ok := <-fw.watcher.Events:
if !ok {
return
}
// Handle parent directory events (file create/replace)
if event.Name == filepath.Join(fw.parentDir, jsonlBase) && event.Op&fsnotify.Create != 0 {
log.log("JSONL file created: %s", event.Name)
// Ensure we're watching the file directly
_ = fw.watcher.Add(fw.jsonlPath)
fw.debouncer.Trigger()
continue
}
// Handle JSONL write/chmod events
if event.Name == fw.jsonlPath && event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Chmod) != 0 {
log.log("File change detected: %s (op: %v)", event.Name, event.Op)
fw.debouncer.Trigger()
continue
}
// Handle JSONL removal/rename (e.g., git checkout)
if event.Name == fw.jsonlPath && (event.Op&fsnotify.Remove != 0 || event.Op&fsnotify.Rename != 0) {
log.log("JSONL removed/renamed, re-establishing watch")
fw.watcher.Remove(fw.jsonlPath)
// Retry with exponential backoff
fw.reEstablishWatch(ctx, log)
continue
}
// Handle .git/HEAD changes (branch switches)
if event.Name == fw.gitHeadPath && event.Op&(fsnotify.Write|fsnotify.Create) != 0 {
log.log("Git HEAD change detected: %s", event.Name)
fw.debouncer.Trigger()
continue
}
// Handle git ref changes (only events under gitRefsPath)
if event.Op&fsnotify.Write != 0 && strings.HasPrefix(event.Name, fw.gitRefsPath) {
log.log("Git ref change detected: %s", event.Name)
fw.debouncer.Trigger()
continue
}
case err, ok := <-fw.watcher.Errors:
if !ok {
return
}
log.log("Watcher error: %v", err)
case <-ctx.Done():
return
}
}
}()
}
// reEstablishWatch attempts to re-add the JSONL watch with exponential backoff.
func (fw *FileWatcher) reEstablishWatch(ctx context.Context, log daemonLogger) {
delays := []time.Duration{50 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond, 400 * time.Millisecond}
for _, delay := range delays {
select {
case <-ctx.Done():
return
case <-time.After(delay):
if err := fw.watcher.Add(fw.jsonlPath); err != nil {
if os.IsNotExist(err) {
log.log("JSONL still missing after %v, retrying...", delay)
continue
}
log.log("Failed to re-watch JSONL after %v: %v", delay, err)
return
}
// Success!
log.log("Successfully re-established JSONL watch after %v", delay)
fw.debouncer.Trigger()
return
}
}
log.log("Failed to re-establish JSONL watch after all retries")
}
// startPolling begins polling for file changes using a ticker.
func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) {
log.log("Starting polling mode with %v interval", fw.pollInterval)
ticker := time.NewTicker(fw.pollInterval)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
changed := false
// Check JSONL file
stat, err := os.Stat(fw.jsonlPath)
if err != nil {
if os.IsNotExist(err) {
// File disappeared
if fw.lastExists {
fw.lastExists = false
fw.lastModTime = time.Time{}
fw.lastSize = 0
log.log("File missing (polling): %s", fw.jsonlPath)
changed = true
}
} else {
log.log("Polling error: %v", err)
}
} else {
// File exists
if !fw.lastExists {
// File appeared
fw.lastExists = true
fw.lastModTime = stat.ModTime()
fw.lastSize = stat.Size()
log.log("File appeared (polling): %s", fw.jsonlPath)
changed = true
} else if !stat.ModTime().Equal(fw.lastModTime) || stat.Size() != fw.lastSize {
// File exists and existed before - check for changes
fw.lastModTime = stat.ModTime()
fw.lastSize = stat.Size()
log.log("File change detected (polling): %s", fw.jsonlPath)
changed = true
}
}
// Check .git/HEAD for branch changes
headStat, err := os.Stat(fw.gitHeadPath)
if err != nil {
if os.IsNotExist(err) {
if fw.lastHeadExists {
fw.lastHeadExists = false
fw.lastHeadModTime = time.Time{}
log.log("Git HEAD missing (polling): %s", fw.gitHeadPath)
changed = true
}
}
// Ignore other errors for HEAD - it's optional
} else {
// HEAD exists
if !fw.lastHeadExists {
// HEAD appeared
fw.lastHeadExists = true
fw.lastHeadModTime = headStat.ModTime()
log.log("Git HEAD appeared (polling): %s", fw.gitHeadPath)
changed = true
} else if !headStat.ModTime().Equal(fw.lastHeadModTime) {
// HEAD changed (branch switch)
fw.lastHeadModTime = headStat.ModTime()
log.log("Git HEAD change detected (polling): %s", fw.gitHeadPath)
changed = true
}
}
if changed {
fw.debouncer.Trigger()
}
case <-ctx.Done():
return
}
}
}()
}
// Close stops the file watcher and releases resources.
func (fw *FileWatcher) Close() error {
// Stop background goroutines
if fw.cancel != nil {
fw.cancel()
}
fw.debouncer.Cancel()
if fw.watcher != nil {
return fw.watcher.Close()
}
return nil
}