Fix bd-66: Add robust polling fallback for file watcher
- Detect fsnotify.NewWatcher() errors and auto-fallback to polling mode - Add BEADS_WATCHER_FALLBACK env var to control behavior (default: enabled) - Poll every 5 seconds with comprehensive change detection: - Track file existence, size, and mtime to catch all changes - Handle file disappearance/reappearance correctly - Trigger on file recreation even with older timestamps - Fix goroutine leak: Close() now stops background goroutines via cancel context - Tighten git refs filtering to only trigger for events under .git/refs/heads - Trigger after successful JSONL rewatch on rename/remove events - Improve logging to show actual poll interval in warnings All tests passing. Amp-Thread-ID: https://ampcode.com/threads/T-8f5edc23-4b78-4b80-b8f3-66050f45eb61 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -62,7 +62,7 @@
|
|||||||
{"id":"bd-63","title":"Add internal/ai package for LLM integration","description":"Shared AI client for repair commands.\n\nProviders:\n- Anthropic (Claude)\n- OpenAI (GPT)\n- Ollama (local)\n\nEnv vars:\n- BEADS_AI_PROVIDER\n- BEADS_AI_API_KEY\n- BEADS_AI_MODEL\n\nFiles: internal/ai/client.go (new)","status":"open","priority":1,"issue_type":"task","created_at":"2025-10-28T14:48:29.072473-07:00","updated_at":"2025-10-28T14:48:29.072473-07:00","dependencies":[{"issue_id":"bd-63","depends_on_id":"bd-56","type":"blocks","created_at":"2025-10-28T14:48:29.073553-07:00","created_by":"daemon"}]}
|
{"id":"bd-63","title":"Add internal/ai package for LLM integration","description":"Shared AI client for repair commands.\n\nProviders:\n- Anthropic (Claude)\n- OpenAI (GPT)\n- Ollama (local)\n\nEnv vars:\n- BEADS_AI_PROVIDER\n- BEADS_AI_API_KEY\n- BEADS_AI_MODEL\n\nFiles: internal/ai/client.go (new)","status":"open","priority":1,"issue_type":"task","created_at":"2025-10-28T14:48:29.072473-07:00","updated_at":"2025-10-28T14:48:29.072473-07:00","dependencies":[{"issue_id":"bd-63","depends_on_id":"bd-56","type":"blocks","created_at":"2025-10-28T14:48:29.073553-07:00","created_by":"daemon"}]}
|
||||||
{"id":"bd-64","title":"Add embedding generation for duplicate detection","description":"Use embeddings for scalable duplicate detection.\n\nModel: text-embedding-3-small (OpenAI) or all-MiniLM-L6-v2 (local)\nStorage: SQLite vector extension or in-memory\nCost: ~/bin/bash.0002 per 100 issues\n\nMuch cheaper than LLM comparisons for large databases.\n\nFiles: internal/embeddings/ (new package)","status":"open","priority":2,"issue_type":"task","created_at":"2025-10-28T14:48:29.072913-07:00","updated_at":"2025-10-28T14:48:29.072913-07:00","dependencies":[{"issue_id":"bd-64","depends_on_id":"bd-56","type":"blocks","created_at":"2025-10-28T14:48:29.07486-07:00","created_by":"daemon"}]}
|
{"id":"bd-64","title":"Add embedding generation for duplicate detection","description":"Use embeddings for scalable duplicate detection.\n\nModel: text-embedding-3-small (OpenAI) or all-MiniLM-L6-v2 (local)\nStorage: SQLite vector extension or in-memory\nCost: ~/bin/bash.0002 per 100 issues\n\nMuch cheaper than LLM comparisons for large databases.\n\nFiles: internal/embeddings/ (new package)","status":"open","priority":2,"issue_type":"task","created_at":"2025-10-28T14:48:29.072913-07:00","updated_at":"2025-10-28T14:48:29.072913-07:00","dependencies":[{"issue_id":"bd-64","depends_on_id":"bd-56","type":"blocks","created_at":"2025-10-28T14:48:29.07486-07:00","created_by":"daemon"}]}
|
||||||
{"id":"bd-65","title":"bd resolve-conflicts - Git merge conflict resolver","description":"Automatically resolve JSONL merge conflicts.\n\nModes:\n- Mechanical: ID remapping (no AI)\n- AI-assisted: Smart merge/keep decisions\n- Interactive: Review each conflict\n\nHandles \u003c\u003c\u003c\u003c\u003c\u003c\u003c conflict markers in .beads/beads.jsonl\n\nFiles: cmd/bd/resolve_conflicts.go (new)","status":"open","priority":1,"issue_type":"task","created_at":"2025-10-28T14:48:30.083642-07:00","updated_at":"2025-10-28T14:48:30.083642-07:00","dependencies":[{"issue_id":"bd-65","depends_on_id":"bd-56","type":"blocks","created_at":"2025-10-28T14:48:30.084575-07:00","created_by":"daemon"}]}
|
{"id":"bd-65","title":"bd resolve-conflicts - Git merge conflict resolver","description":"Automatically resolve JSONL merge conflicts.\n\nModes:\n- Mechanical: ID remapping (no AI)\n- AI-assisted: Smart merge/keep decisions\n- Interactive: Review each conflict\n\nHandles \u003c\u003c\u003c\u003c\u003c\u003c\u003c conflict markers in .beads/beads.jsonl\n\nFiles: cmd/bd/resolve_conflicts.go (new)","status":"open","priority":1,"issue_type":"task","created_at":"2025-10-28T14:48:30.083642-07:00","updated_at":"2025-10-28T14:48:30.083642-07:00","dependencies":[{"issue_id":"bd-65","depends_on_id":"bd-56","type":"blocks","created_at":"2025-10-28T14:48:30.084575-07:00","created_by":"daemon"}]}
|
||||||
{"id":"bd-66","title":"Add fallback to polling on watcher failure","description":"Detect fsnotify.NewWatcher() errors and log warning. Auto-switch to polling mode with 5s ticker. Add BEADS_WATCHER_FALLBACK env var to control behavior.","status":"open","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.428439-07:00","updated_at":"2025-10-28T16:20:02.428439-07:00"}
|
{"id":"bd-66","title":"Add fallback to polling on watcher failure","description":"Detect fsnotify.NewWatcher() errors and log warning. Auto-switch to polling mode with 5s ticker. Add BEADS_WATCHER_FALLBACK env var to control behavior.","status":"closed","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.428439-07:00","updated_at":"2025-10-28T19:23:43.595916-07:00","closed_at":"2025-10-28T19:23:43.595916-07:00"}
|
||||||
{"id":"bd-67","title":"Create cmd/bd/daemon_event_loop.go (~200 LOC)","description":"Implement runEventDrivenLoop to replace polling ticker. Coordinate FileWatcher, mutation events, debouncer. Include health check ticker (60s) for daemon validation.","status":"closed","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.429383-07:00","updated_at":"2025-10-28T16:20:02.429383-07:00","closed_at":"2025-10-28T12:30:44.067036-07:00"}
|
{"id":"bd-67","title":"Create cmd/bd/daemon_event_loop.go (~200 LOC)","description":"Implement runEventDrivenLoop to replace polling ticker. Coordinate FileWatcher, mutation events, debouncer. Include health check ticker (60s) for daemon validation.","status":"closed","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.429383-07:00","updated_at":"2025-10-28T16:20:02.429383-07:00","closed_at":"2025-10-28T12:30:44.067036-07:00"}
|
||||||
{"id":"bd-68","title":"Add fsnotify dependency to go.mod","description":"","status":"in_progress","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.429763-07:00","updated_at":"2025-10-28T16:20:02.429763-07:00"}
|
{"id":"bd-68","title":"Add fsnotify dependency to go.mod","description":"","status":"in_progress","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.429763-07:00","updated_at":"2025-10-28T16:20:02.429763-07:00"}
|
||||||
{"id":"bd-69","title":"Replace getStorageForRequest with Direct Access","description":"Replace all getStorageForRequest(req) calls with s.storage","acceptance_criteria":"- No references to getStorageForRequest() in codebase (except in deleted file)\n- All handlers use s.storage directly\n- Code compiles without errors\n\nFiles to update:\n- internal/rpc/server_issues_epics.go (~8 calls)\n- internal/rpc/server_labels_deps_comments.go (~4 calls)\n- internal/rpc/server_compact.go (~2 calls)\n- internal/rpc/server_export_import_auto.go (~2 calls)\n- internal/rpc/server_routing_validation_diagnostics.go (~1 call)\n\nPattern: store, err := s.getStorageForRequest(req) → store := s.storage","status":"closed","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.430127-07:00","updated_at":"2025-10-28T19:20:58.312809-07:00","closed_at":"2025-10-28T19:20:58.312809-07:00"}
|
{"id":"bd-69","title":"Replace getStorageForRequest with Direct Access","description":"Replace all getStorageForRequest(req) calls with s.storage","acceptance_criteria":"- No references to getStorageForRequest() in codebase (except in deleted file)\n- All handlers use s.storage directly\n- Code compiles without errors\n\nFiles to update:\n- internal/rpc/server_issues_epics.go (~8 calls)\n- internal/rpc/server_labels_deps_comments.go (~4 calls)\n- internal/rpc/server_compact.go (~2 calls)\n- internal/rpc/server_export_import_auto.go (~2 calls)\n- internal/rpc/server_routing_validation_diagnostics.go (~1 call)\n\nPattern: store, err := s.getStorageForRequest(req) → store := s.storage","status":"closed","priority":1,"issue_type":"task","created_at":"2025-10-28T16:20:02.430127-07:00","updated_at":"2025-10-28T19:20:58.312809-07:00","closed_at":"2025-10-28T19:20:58.312809-07:00"}
|
||||||
|
|||||||
@@ -3,49 +3,99 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileWatcher monitors JSONL and git ref changes using filesystem events.
|
// FileWatcher monitors JSONL and git ref changes using filesystem events or polling.
|
||||||
type FileWatcher struct {
|
type FileWatcher struct {
|
||||||
watcher *fsnotify.Watcher
|
watcher *fsnotify.Watcher
|
||||||
debouncer *Debouncer
|
debouncer *Debouncer
|
||||||
jsonlPath string
|
jsonlPath string
|
||||||
|
pollingMode bool
|
||||||
|
lastModTime time.Time
|
||||||
|
lastExists bool
|
||||||
|
lastSize int64
|
||||||
|
pollInterval time.Duration
|
||||||
|
gitRefsPath string
|
||||||
|
cancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileWatcher creates a file watcher for the given JSONL path.
|
// NewFileWatcher creates a file watcher for the given JSONL path.
|
||||||
// onChanged is called when the file or git refs change, after debouncing.
|
// onChanged is called when the file or git refs change, after debouncing.
|
||||||
|
// Falls back to polling mode if fsnotify fails (controlled by BEADS_WATCHER_FALLBACK env var).
|
||||||
func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
|
func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
|
||||||
watcher, err := fsnotify.NewWatcher()
|
fw := &FileWatcher{
|
||||||
if err != nil {
|
jsonlPath: jsonlPath,
|
||||||
return nil, err
|
debouncer: NewDebouncer(500*time.Millisecond, onChanged),
|
||||||
|
pollInterval: 5 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
fw := &FileWatcher{
|
// Get initial file state for polling fallback
|
||||||
watcher: watcher,
|
if stat, err := os.Stat(jsonlPath); err == nil {
|
||||||
jsonlPath: jsonlPath,
|
fw.lastModTime = stat.ModTime()
|
||||||
debouncer: NewDebouncer(500*time.Millisecond, onChanged),
|
fw.lastExists = true
|
||||||
|
fw.lastSize = stat.Size()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if fallback is disabled
|
||||||
|
fallbackEnv := os.Getenv("BEADS_WATCHER_FALLBACK")
|
||||||
|
fallbackDisabled := fallbackEnv == "false" || fallbackEnv == "0"
|
||||||
|
|
||||||
|
// Store git refs path for filtering
|
||||||
|
fw.gitRefsPath = filepath.Join(filepath.Dir(jsonlPath), "..", ".git", "refs", "heads")
|
||||||
|
|
||||||
|
watcher, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
if fallbackDisabled {
|
||||||
|
return nil, fmt.Errorf("fsnotify.NewWatcher() failed and BEADS_WATCHER_FALLBACK is disabled: %w", err)
|
||||||
|
}
|
||||||
|
// Fall back to polling mode
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: fsnotify.NewWatcher() failed (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
|
||||||
|
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
|
||||||
|
fw.pollingMode = true
|
||||||
|
return fw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fw.watcher = watcher
|
||||||
|
|
||||||
// Watch the JSONL file
|
// Watch the JSONL file
|
||||||
if err := watcher.Add(jsonlPath); err != nil {
|
if err := watcher.Add(jsonlPath); err != nil {
|
||||||
watcher.Close()
|
watcher.Close()
|
||||||
return nil, fmt.Errorf("failed to watch JSONL: %w", err)
|
if fallbackDisabled {
|
||||||
|
return nil, fmt.Errorf("failed to watch JSONL and BEADS_WATCHER_FALLBACK is disabled: %w", err)
|
||||||
|
}
|
||||||
|
// Fall back to polling mode
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to watch JSONL (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
|
||||||
|
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
|
||||||
|
fw.pollingMode = true
|
||||||
|
fw.watcher = nil
|
||||||
|
return fw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also watch .git/refs/heads for branch changes (best effort)
|
// Also watch .git/refs/heads for branch changes (best effort)
|
||||||
gitRefsPath := filepath.Join(filepath.Dir(jsonlPath), "..", ".git", "refs", "heads")
|
_ = watcher.Add(fw.gitRefsPath) // Ignore error - not all setups have this
|
||||||
_ = watcher.Add(gitRefsPath) // Ignore error - not all setups have this
|
|
||||||
|
|
||||||
return fw, nil
|
return fw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins monitoring filesystem events.
|
// Start begins monitoring filesystem events or polling.
|
||||||
// Runs in background goroutine until context is canceled.
|
// Runs in background goroutine until context is canceled.
|
||||||
|
// Should only be called once per FileWatcher instance.
|
||||||
func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
|
func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
|
||||||
|
// Create internal cancel so Close can stop goroutines
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
fw.cancel = cancel
|
||||||
|
|
||||||
|
if fw.pollingMode {
|
||||||
|
fw.startPolling(ctx, log)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@@ -68,11 +118,14 @@ func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
|
|||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
if err := fw.watcher.Add(fw.jsonlPath); err != nil {
|
if err := fw.watcher.Add(fw.jsonlPath); err != nil {
|
||||||
log.log("Failed to re-watch JSONL: %v", err)
|
log.log("Failed to re-watch JSONL: %v", err)
|
||||||
|
} else {
|
||||||
|
// File was recreated, trigger to reload
|
||||||
|
fw.debouncer.Trigger()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle git ref changes
|
// Handle git ref changes (only events under gitRefsPath)
|
||||||
if event.Op&fsnotify.Write != 0 && filepath.Dir(event.Name) != filepath.Dir(fw.jsonlPath) {
|
if event.Op&fsnotify.Write != 0 && strings.HasPrefix(event.Name, fw.gitRefsPath) {
|
||||||
log.log("Git ref change detected: %s", event.Name)
|
log.log("Git ref change detected: %s", event.Name)
|
||||||
fw.debouncer.Trigger()
|
fw.debouncer.Trigger()
|
||||||
}
|
}
|
||||||
@@ -90,8 +143,66 @@ func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startPolling begins polling for file changes using a ticker.
|
||||||
|
func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) {
|
||||||
|
log.log("Starting polling mode with %v interval", fw.pollInterval)
|
||||||
|
ticker := time.NewTicker(fw.pollInterval)
|
||||||
|
go func() {
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
stat, err := os.Stat(fw.jsonlPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
// File disappeared
|
||||||
|
if fw.lastExists {
|
||||||
|
fw.lastExists = false
|
||||||
|
fw.lastModTime = time.Time{}
|
||||||
|
fw.lastSize = 0
|
||||||
|
log.log("File missing (polling): %s", fw.jsonlPath)
|
||||||
|
fw.debouncer.Trigger()
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.log("Polling error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// File exists
|
||||||
|
if !fw.lastExists {
|
||||||
|
// File appeared
|
||||||
|
fw.lastExists = true
|
||||||
|
fw.lastModTime = stat.ModTime()
|
||||||
|
fw.lastSize = stat.Size()
|
||||||
|
log.log("File appeared (polling): %s", fw.jsonlPath)
|
||||||
|
fw.debouncer.Trigger()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// File exists and existed before - check for changes
|
||||||
|
if !stat.ModTime().Equal(fw.lastModTime) || stat.Size() != fw.lastSize {
|
||||||
|
fw.lastModTime = stat.ModTime()
|
||||||
|
fw.lastSize = stat.Size()
|
||||||
|
log.log("File change detected (polling): %s", fw.jsonlPath)
|
||||||
|
fw.debouncer.Trigger()
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Close stops the file watcher and releases resources.
|
// Close stops the file watcher and releases resources.
|
||||||
func (fw *FileWatcher) Close() error {
|
func (fw *FileWatcher) Close() error {
|
||||||
|
// Stop background goroutines
|
||||||
|
if fw.cancel != nil {
|
||||||
|
fw.cancel()
|
||||||
|
}
|
||||||
fw.debouncer.Cancel()
|
fw.debouncer.Cancel()
|
||||||
return fw.watcher.Close()
|
if fw.watcher != nil {
|
||||||
|
return fw.watcher.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user