Files
beads/cmd/bd/daemon_watcher.go
Ryan fb65163692 fix: address critical resource leaks and error handling issues (#327)
* fix: address critical resource leaks and error handling issues

Fixes 5 critical and high-priority issues identified in codebase analysis:

1. bd-vavh: Fix row iterator resource leak in recursive dependency queries
   - Move defer rows.Close() to execute on all code paths
   - Previously leaked connections on scan errors
   - Location: internal/storage/sqlite/sqlite.go:1121-1145

2. bd-qhws: Configure database connection pool limits for daemon mode
   - Set MaxOpenConns to runtime.NumCPU() + 1 for file-based databases
   - Prevents connection exhaustion under concurrent RPC load
   - Only affects daemon mode (long-running server)
   - Location: internal/storage/sqlite/sqlite.go:108-125

3. bd-jo38: Add WaitGroup tracking to FileWatcher goroutines
   - Track goroutines with sync.WaitGroup for graceful shutdown
   - Wait for goroutines to finish before cleanup in Close()
   - Prevents race condition on debouncer access during shutdown
   - Location: cmd/bd/daemon_watcher.go (Start, startPolling, Close)

4. bd-2d5r: Fix silent error handling in RPC response writing
   - writeResponse now returns errors instead of ignoring them
   - Prevents sending partial JSON and client hangs
   - Closes connection on marshal/write errors
   - Location: internal/rpc/server_lifecycle_conn.go:227-246

5. bd-zqmb: Fix goroutine leak in daemon restart
   - Add 10-second timeout to daemon Wait() goroutine
   - Kill process if it doesn't fork within timeout
   - Prevents goroutine accumulation on restart failures
   - Location: cmd/bd/daemons.go:250-268

All changes follow Go best practices and maintain backward compatibility.

* Add feature request for .beads/README.md generation during init

Created bd-m7ge to automatically generate a promotional/documentation
README in the .beads directory when running 'bd init'. This will help
advertise Beads in open source repositories and provide quick reference
documentation for developers using AI coding agents.

The README will include:
- Brief explanation of Beads (AI-native issue tracking)
- Link to steveyegge/beads repository
- Quick reference of essential commands
- Compelling messaging to encourage adoption
2025-11-20 08:13:06 -08:00

314 lines
9.2 KiB
Go

package main
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
// FileWatcher monitors JSONL and git ref changes using filesystem events or polling.
type FileWatcher struct {
watcher *fsnotify.Watcher
debouncer *Debouncer
jsonlPath string
parentDir string
pollingMode bool
lastModTime time.Time
lastExists bool
lastSize int64
pollInterval time.Duration
gitRefsPath string
gitHeadPath string
lastHeadModTime time.Time
lastHeadExists bool
cancel context.CancelFunc
wg sync.WaitGroup // Track goroutines for graceful shutdown (bd-jo38)
}
// NewFileWatcher creates a file watcher for the given JSONL path.
// onChanged is called when the file or git refs change, after debouncing.
// Falls back to polling mode if fsnotify fails (controlled by BEADS_WATCHER_FALLBACK env var).
func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
fw := &FileWatcher{
jsonlPath: jsonlPath,
parentDir: filepath.Dir(jsonlPath),
debouncer: NewDebouncer(500*time.Millisecond, onChanged),
pollInterval: 5 * time.Second,
}
// Get initial file state for polling fallback
if stat, err := os.Stat(jsonlPath); err == nil {
fw.lastModTime = stat.ModTime()
fw.lastExists = true
fw.lastSize = stat.Size()
}
// Check if fallback is disabled
fallbackEnv := os.Getenv("BEADS_WATCHER_FALLBACK")
fallbackDisabled := fallbackEnv == "false" || fallbackEnv == "0"
// Store git paths for filtering
gitDir := filepath.Join(fw.parentDir, "..", ".git")
fw.gitRefsPath = filepath.Join(gitDir, "refs", "heads")
fw.gitHeadPath = filepath.Join(gitDir, "HEAD")
// Get initial git HEAD state for polling
if stat, err := os.Stat(fw.gitHeadPath); err == nil {
fw.lastHeadModTime = stat.ModTime()
fw.lastHeadExists = true
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
if fallbackDisabled {
return nil, fmt.Errorf("fsnotify.NewWatcher() failed and BEADS_WATCHER_FALLBACK is disabled: %w", err)
}
// Fall back to polling mode
fmt.Fprintf(os.Stderr, "Warning: fsnotify.NewWatcher() failed (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
fw.pollingMode = true
return fw, nil
}
fw.watcher = watcher
// Watch the parent directory (catches creates/renames)
if err := watcher.Add(fw.parentDir); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to watch parent directory %s: %v\n", fw.parentDir, err)
}
// Watch the JSONL file (may not exist yet)
if err := watcher.Add(jsonlPath); err != nil {
if os.IsNotExist(err) {
// File doesn't exist yet - rely on parent dir watch
fmt.Fprintf(os.Stderr, "Info: JSONL file %s doesn't exist yet, watching parent directory\n", jsonlPath)
} else {
_ = watcher.Close()
if fallbackDisabled {
return nil, fmt.Errorf("failed to watch JSONL and BEADS_WATCHER_FALLBACK is disabled: %w", err)
}
// Fall back to polling mode
fmt.Fprintf(os.Stderr, "Warning: failed to watch JSONL (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
fw.pollingMode = true
fw.watcher = nil
return fw, nil
}
}
// Also watch .git/refs/heads and .git/HEAD for branch changes (best effort)
_ = watcher.Add(fw.gitRefsPath) // Ignore error - not all setups have this
_ = watcher.Add(fw.gitHeadPath) // Ignore error - not all setups have this
return fw, nil
}
// Start begins monitoring filesystem events or polling.
// Runs in background goroutine until context is canceled.
// Should only be called once per FileWatcher instance.
func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
// Create internal cancel so Close can stop goroutines
ctx, cancel := context.WithCancel(ctx)
fw.cancel = cancel
if fw.pollingMode {
fw.startPolling(ctx, log)
return
}
fw.wg.Add(1)
go func() {
defer fw.wg.Done()
jsonlBase := filepath.Base(fw.jsonlPath)
for {
select {
case event, ok := <-fw.watcher.Events:
if !ok {
return
}
// Handle parent directory events (file create/replace)
if event.Name == filepath.Join(fw.parentDir, jsonlBase) && event.Op&fsnotify.Create != 0 {
log.log("JSONL file created: %s", event.Name)
// Ensure we're watching the file directly
_ = fw.watcher.Add(fw.jsonlPath)
fw.debouncer.Trigger()
continue
}
// Handle JSONL write/chmod events
if event.Name == fw.jsonlPath && event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Chmod) != 0 {
log.log("File change detected: %s (op: %v)", event.Name, event.Op)
fw.debouncer.Trigger()
continue
}
// Handle JSONL removal/rename (e.g., git checkout)
if event.Name == fw.jsonlPath && (event.Op&fsnotify.Remove != 0 || event.Op&fsnotify.Rename != 0) {
log.log("JSONL removed/renamed, re-establishing watch")
_ = fw.watcher.Remove(fw.jsonlPath)
// Retry with exponential backoff
fw.reEstablishWatch(ctx, log)
continue
}
// Handle .git/HEAD changes (branch switches)
if event.Name == fw.gitHeadPath && event.Op&(fsnotify.Write|fsnotify.Create) != 0 {
log.log("Git HEAD change detected: %s", event.Name)
fw.debouncer.Trigger()
continue
}
// Handle git ref changes (only events under gitRefsPath)
if event.Op&fsnotify.Write != 0 && strings.HasPrefix(event.Name, fw.gitRefsPath) {
log.log("Git ref change detected: %s", event.Name)
fw.debouncer.Trigger()
continue
}
case err, ok := <-fw.watcher.Errors:
if !ok {
return
}
log.log("Watcher error: %v", err)
case <-ctx.Done():
return
}
}
}()
}
// reEstablishWatch attempts to re-add the JSONL watch with exponential backoff.
func (fw *FileWatcher) reEstablishWatch(ctx context.Context, log daemonLogger) {
delays := []time.Duration{50 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond, 400 * time.Millisecond}
for _, delay := range delays {
select {
case <-ctx.Done():
return
case <-time.After(delay):
if err := fw.watcher.Add(fw.jsonlPath); err != nil {
if os.IsNotExist(err) {
log.log("JSONL still missing after %v, retrying...", delay)
continue
}
log.log("Failed to re-watch JSONL after %v: %v", delay, err)
return
}
// Success!
log.log("Successfully re-established JSONL watch after %v", delay)
fw.debouncer.Trigger()
return
}
}
log.log("Failed to re-establish JSONL watch after all retries")
}
// startPolling begins polling for file changes using a ticker.
func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) {
log.log("Starting polling mode with %v interval", fw.pollInterval)
ticker := time.NewTicker(fw.pollInterval)
fw.wg.Add(1)
go func() {
defer fw.wg.Done()
defer ticker.Stop()
for {
select {
case <-ticker.C:
changed := false
// Check JSONL file
stat, err := os.Stat(fw.jsonlPath)
if err != nil {
if os.IsNotExist(err) {
// File disappeared
if fw.lastExists {
fw.lastExists = false
fw.lastModTime = time.Time{}
fw.lastSize = 0
log.log("File missing (polling): %s", fw.jsonlPath)
changed = true
}
} else {
log.log("Polling error: %v", err)
}
} else {
// File exists
if !fw.lastExists {
// File appeared
fw.lastExists = true
fw.lastModTime = stat.ModTime()
fw.lastSize = stat.Size()
log.log("File appeared (polling): %s", fw.jsonlPath)
changed = true
} else if !stat.ModTime().Equal(fw.lastModTime) || stat.Size() != fw.lastSize {
// File exists and existed before - check for changes
fw.lastModTime = stat.ModTime()
fw.lastSize = stat.Size()
log.log("File change detected (polling): %s", fw.jsonlPath)
changed = true
}
}
// Check .git/HEAD for branch changes
headStat, err := os.Stat(fw.gitHeadPath)
if err != nil {
if os.IsNotExist(err) {
if fw.lastHeadExists {
fw.lastHeadExists = false
fw.lastHeadModTime = time.Time{}
log.log("Git HEAD missing (polling): %s", fw.gitHeadPath)
changed = true
}
}
// Ignore other errors for HEAD - it's optional
} else {
// HEAD exists
if !fw.lastHeadExists {
// HEAD appeared
fw.lastHeadExists = true
fw.lastHeadModTime = headStat.ModTime()
log.log("Git HEAD appeared (polling): %s", fw.gitHeadPath)
changed = true
} else if !headStat.ModTime().Equal(fw.lastHeadModTime) {
// HEAD changed (branch switch)
fw.lastHeadModTime = headStat.ModTime()
log.log("Git HEAD change detected (polling): %s", fw.gitHeadPath)
changed = true
}
}
if changed {
fw.debouncer.Trigger()
}
case <-ctx.Done():
return
}
}
}()
}
// Close stops the file watcher and releases resources.
func (fw *FileWatcher) Close() error {
// Stop background goroutines
if fw.cancel != nil {
fw.cancel()
}
// Wait for goroutines to finish before cleanup (bd-jo38)
fw.wg.Wait()
fw.debouncer.Cancel()
if fw.watcher != nil {
return fw.watcher.Close()
}
return nil
}