Fix FileWatcher robustness issues (bd-71)

- Fix Debouncer race condition with sequence numbers to prevent double-fire
- Add parent directory watch to catch file creates/renames
- Add .git/HEAD watch for branch change detection
- Implement retry/backoff (50-400ms) for re-establishing JSONL watch
- Handle Create/Chmod events in addition to Write
- Add .git/HEAD polling in polling mode
- All 18 debouncer and watcher tests pass

Amp-Thread-ID: https://ampcode.com/threads/T-4029d643-b4b4-4d3b-bd85-74461f78cd7f
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-10-29 22:55:13 -07:00
parent 0f1b597961
commit 731f40da34
2 changed files with 165 additions and 57 deletions

View File

@@ -12,6 +12,7 @@ type Debouncer struct {
timer *time.Timer timer *time.Timer
duration time.Duration duration time.Duration
action func() action func()
seq uint64 // Sequence number to prevent stale timer fires
} }
// NewDebouncer creates a new debouncer with the given duration and action. // NewDebouncer creates a new debouncer with the given duration and action.
@@ -34,11 +35,21 @@ func (d *Debouncer) Trigger() {
d.timer.Stop() d.timer.Stop()
} }
// Increment sequence number to invalidate any pending timers
d.seq++
currentSeq := d.seq
d.timer = time.AfterFunc(d.duration, func() { d.timer = time.AfterFunc(d.duration, func() {
d.action()
d.mu.Lock() d.mu.Lock()
d.timer = nil defer d.mu.Unlock()
d.mu.Unlock()
// Only fire if this is still the latest trigger
if d.seq == currentSeq {
d.timer = nil
d.mu.Unlock() // Unlock before calling action to avoid holding lock during callback
d.action()
d.mu.Lock() // Re-lock for defer
}
}) })
} }

View File

@@ -13,16 +13,20 @@ import (
// FileWatcher monitors JSONL and git ref changes using filesystem events or polling. // FileWatcher monitors JSONL and git ref changes using filesystem events or polling.
type FileWatcher struct { type FileWatcher struct {
watcher *fsnotify.Watcher watcher *fsnotify.Watcher
debouncer *Debouncer debouncer *Debouncer
jsonlPath string jsonlPath string
pollingMode bool parentDir string
lastModTime time.Time pollingMode bool
lastExists bool lastModTime time.Time
lastSize int64 lastExists bool
pollInterval time.Duration lastSize int64
gitRefsPath string pollInterval time.Duration
cancel context.CancelFunc gitRefsPath string
gitHeadPath string
lastHeadModTime time.Time
lastHeadExists bool
cancel context.CancelFunc
} }
// NewFileWatcher creates a file watcher for the given JSONL path. // NewFileWatcher creates a file watcher for the given JSONL path.
@@ -31,6 +35,7 @@ type FileWatcher struct {
func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) { func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
fw := &FileWatcher{ fw := &FileWatcher{
jsonlPath: jsonlPath, jsonlPath: jsonlPath,
parentDir: filepath.Dir(jsonlPath),
debouncer: NewDebouncer(500*time.Millisecond, onChanged), debouncer: NewDebouncer(500*time.Millisecond, onChanged),
pollInterval: 5 * time.Second, pollInterval: 5 * time.Second,
} }
@@ -46,8 +51,16 @@ func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
fallbackEnv := os.Getenv("BEADS_WATCHER_FALLBACK") fallbackEnv := os.Getenv("BEADS_WATCHER_FALLBACK")
fallbackDisabled := fallbackEnv == "false" || fallbackEnv == "0" fallbackDisabled := fallbackEnv == "false" || fallbackEnv == "0"
// Store git refs path for filtering // Store git paths for filtering
fw.gitRefsPath = filepath.Join(filepath.Dir(jsonlPath), "..", ".git", "refs", "heads") gitDir := filepath.Join(fw.parentDir, "..", ".git")
fw.gitRefsPath = filepath.Join(gitDir, "refs", "heads")
fw.gitHeadPath = filepath.Join(gitDir, "HEAD")
// Get initial git HEAD state for polling
if stat, err := os.Stat(fw.gitHeadPath); err == nil {
fw.lastHeadModTime = stat.ModTime()
fw.lastHeadExists = true
}
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
@@ -63,22 +76,33 @@ func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
fw.watcher = watcher fw.watcher = watcher
// Watch the JSONL file // Watch the parent directory (catches creates/renames)
if err := watcher.Add(jsonlPath); err != nil { if err := watcher.Add(fw.parentDir); err != nil {
watcher.Close() fmt.Fprintf(os.Stderr, "Warning: failed to watch parent directory %s: %v\n", fw.parentDir, err)
if fallbackDisabled {
return nil, fmt.Errorf("failed to watch JSONL and BEADS_WATCHER_FALLBACK is disabled: %w", err)
}
// Fall back to polling mode
fmt.Fprintf(os.Stderr, "Warning: failed to watch JSONL (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
fw.pollingMode = true
fw.watcher = nil
return fw, nil
} }
// Also watch .git/refs/heads for branch changes (best effort) // Watch the JSONL file (may not exist yet)
if err := watcher.Add(jsonlPath); err != nil {
if os.IsNotExist(err) {
// File doesn't exist yet - rely on parent dir watch
fmt.Fprintf(os.Stderr, "Info: JSONL file %s doesn't exist yet, watching parent directory\n", jsonlPath)
} else {
watcher.Close()
if fallbackDisabled {
return nil, fmt.Errorf("failed to watch JSONL and BEADS_WATCHER_FALLBACK is disabled: %w", err)
}
// Fall back to polling mode
fmt.Fprintf(os.Stderr, "Warning: failed to watch JSONL (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
fw.pollingMode = true
fw.watcher = nil
return fw, nil
}
}
// Also watch .git/refs/heads and .git/HEAD for branch changes (best effort)
_ = watcher.Add(fw.gitRefsPath) // Ignore error - not all setups have this _ = watcher.Add(fw.gitRefsPath) // Ignore error - not all setups have this
_ = watcher.Add(fw.gitHeadPath) // Ignore error - not all setups have this
return fw, nil return fw, nil
} }
@@ -97,6 +121,8 @@ func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
} }
go func() { go func() {
jsonlBase := filepath.Base(fw.jsonlPath)
for { for {
select { select {
case event, ok := <-fw.watcher.Events: case event, ok := <-fw.watcher.Events:
@@ -104,30 +130,43 @@ func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
return return
} }
// Handle JSONL write events // Handle parent directory events (file create/replace)
if event.Name == fw.jsonlPath && event.Op&fsnotify.Write != 0 { if event.Name == filepath.Join(fw.parentDir, jsonlBase) && event.Op&fsnotify.Create != 0 {
log.log("File change detected: %s", event.Name) log.log("JSONL file created: %s", event.Name)
// Ensure we're watching the file directly
_ = fw.watcher.Add(fw.jsonlPath)
fw.debouncer.Trigger() fw.debouncer.Trigger()
continue
}
// Handle JSONL write/chmod events
if event.Name == fw.jsonlPath && event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Chmod) != 0 {
log.log("File change detected: %s (op: %v)", event.Name, event.Op)
fw.debouncer.Trigger()
continue
} }
// Handle JSONL removal/rename (e.g., git checkout) // Handle JSONL removal/rename (e.g., git checkout)
if event.Name == fw.jsonlPath && (event.Op&fsnotify.Remove != 0 || event.Op&fsnotify.Rename != 0) { if event.Name == fw.jsonlPath && (event.Op&fsnotify.Remove != 0 || event.Op&fsnotify.Rename != 0) {
log.log("JSONL removed/renamed, re-establishing watch") log.log("JSONL removed/renamed, re-establishing watch")
fw.watcher.Remove(fw.jsonlPath) fw.watcher.Remove(fw.jsonlPath)
// Brief wait for file to be recreated // Retry with exponential backoff
time.Sleep(100 * time.Millisecond) fw.reEstablishWatch(ctx, log)
if err := fw.watcher.Add(fw.jsonlPath); err != nil { continue
log.log("Failed to re-watch JSONL: %v", err) }
} else {
// File was recreated, trigger to reload // Handle .git/HEAD changes (branch switches)
fw.debouncer.Trigger() if event.Name == fw.gitHeadPath && event.Op&(fsnotify.Write|fsnotify.Create) != 0 {
} log.log("Git HEAD change detected: %s", event.Name)
fw.debouncer.Trigger()
continue
} }
// Handle git ref changes (only events under gitRefsPath) // Handle git ref changes (only events under gitRefsPath)
if event.Op&fsnotify.Write != 0 && strings.HasPrefix(event.Name, fw.gitRefsPath) { if event.Op&fsnotify.Write != 0 && strings.HasPrefix(event.Name, fw.gitRefsPath) {
log.log("Git ref change detected: %s", event.Name) log.log("Git ref change detected: %s", event.Name)
fw.debouncer.Trigger() fw.debouncer.Trigger()
continue
} }
case err, ok := <-fw.watcher.Errors: case err, ok := <-fw.watcher.Errors:
@@ -143,6 +182,32 @@ func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
}() }()
} }
// reEstablishWatch attempts to re-add the JSONL watch with exponential backoff.
func (fw *FileWatcher) reEstablishWatch(ctx context.Context, log daemonLogger) {
delays := []time.Duration{50 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond, 400 * time.Millisecond}
for _, delay := range delays {
select {
case <-ctx.Done():
return
case <-time.After(delay):
if err := fw.watcher.Add(fw.jsonlPath); err != nil {
if os.IsNotExist(err) {
log.log("JSONL still missing after %v, retrying...", delay)
continue
}
log.log("Failed to re-watch JSONL after %v: %v", delay, err)
return
}
// Success!
log.log("Successfully re-established JSONL watch after %v", delay)
fw.debouncer.Trigger()
return
}
}
log.log("Failed to re-establish JSONL watch after all retries")
}
// startPolling begins polling for file changes using a ticker. // startPolling begins polling for file changes using a ticker.
func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) { func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) {
log.log("Starting polling mode with %v interval", fw.pollInterval) log.log("Starting polling mode with %v interval", fw.pollInterval)
@@ -152,6 +217,9 @@ func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
changed := false
// Check JSONL file
stat, err := os.Stat(fw.jsonlPath) stat, err := os.Stat(fw.jsonlPath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@@ -161,32 +229,61 @@ func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) {
fw.lastModTime = time.Time{} fw.lastModTime = time.Time{}
fw.lastSize = 0 fw.lastSize = 0
log.log("File missing (polling): %s", fw.jsonlPath) log.log("File missing (polling): %s", fw.jsonlPath)
fw.debouncer.Trigger() changed = true
} }
continue } else {
log.log("Polling error: %v", err)
}
} else {
// File exists
if !fw.lastExists {
// File appeared
fw.lastExists = true
fw.lastModTime = stat.ModTime()
fw.lastSize = stat.Size()
log.log("File appeared (polling): %s", fw.jsonlPath)
changed = true
} else if !stat.ModTime().Equal(fw.lastModTime) || stat.Size() != fw.lastSize {
// File exists and existed before - check for changes
fw.lastModTime = stat.ModTime()
fw.lastSize = stat.Size()
log.log("File change detected (polling): %s", fw.jsonlPath)
changed = true
} }
log.log("Polling error: %v", err)
continue
} }
// File exists // Check .git/HEAD for branch changes
if !fw.lastExists { headStat, err := os.Stat(fw.gitHeadPath)
// File appeared if err != nil {
fw.lastExists = true if os.IsNotExist(err) {
fw.lastModTime = stat.ModTime() if fw.lastHeadExists {
fw.lastSize = stat.Size() fw.lastHeadExists = false
log.log("File appeared (polling): %s", fw.jsonlPath) fw.lastHeadModTime = time.Time{}
fw.debouncer.Trigger() log.log("Git HEAD missing (polling): %s", fw.gitHeadPath)
continue changed = true
}
}
// Ignore other errors for HEAD - it's optional
} else {
// HEAD exists
if !fw.lastHeadExists {
// HEAD appeared
fw.lastHeadExists = true
fw.lastHeadModTime = headStat.ModTime()
log.log("Git HEAD appeared (polling): %s", fw.gitHeadPath)
changed = true
} else if !headStat.ModTime().Equal(fw.lastHeadModTime) {
// HEAD changed (branch switch)
fw.lastHeadModTime = headStat.ModTime()
log.log("Git HEAD change detected (polling): %s", fw.gitHeadPath)
changed = true
}
} }
// File exists and existed before - check for changes if changed {
if !stat.ModTime().Equal(fw.lastModTime) || stat.Size() != fw.lastSize {
fw.lastModTime = stat.ModTime()
fw.lastSize = stat.Size()
log.log("File change detected (polling): %s", fw.jsonlPath)
fw.debouncer.Trigger() fw.debouncer.Trigger()
} }
case <-ctx.Done(): case <-ctx.Done():
return return
} }