* fix: address critical resource leaks and error handling issues Fixes 5 critical and high-priority issues identified in codebase analysis: 1. bd-vavh: Fix row iterator resource leak in recursive dependency queries - Move defer rows.Close() to execute on all code paths - Previously leaked connections on scan errors - Location: internal/storage/sqlite/sqlite.go:1121-1145 2. bd-qhws: Configure database connection pool limits for daemon mode - Set MaxOpenConns to runtime.NumCPU() + 1 for file-based databases - Prevents connection exhaustion under concurrent RPC load - Only affects daemon mode (long-running server) - Location: internal/storage/sqlite/sqlite.go:108-125 3. bd-jo38: Add WaitGroup tracking to FileWatcher goroutines - Track goroutines with sync.WaitGroup for graceful shutdown - Wait for goroutines to finish before cleanup in Close() - Prevents race condition on debouncer access during shutdown - Location: cmd/bd/daemon_watcher.go (Start, startPolling, Close) 4. bd-2d5r: Fix silent error handling in RPC response writing - writeResponse now returns errors instead of ignoring them - Prevents sending partial JSON and client hangs - Closes connection on marshal/write errors - Location: internal/rpc/server_lifecycle_conn.go:227-246 5. bd-zqmb: Fix goroutine leak in daemon restart - Add 10-second timeout to daemon Wait() goroutine - Kill process if it doesn't fork within timeout - Prevents goroutine accumulation on restart failures - Location: cmd/bd/daemons.go:250-268 All changes follow Go best practices and maintain backward compatibility. * Add feature request for .beads/README.md generation during init Created bd-m7ge to automatically generate a promotional/documentation README in the .beads directory when running 'bd init'. This will help advertise Beads in open source repositories and provide quick reference documentation for developers using AI coding agents. The README will include: - Brief explanation of Beads (AI-native issue tracking) - Link to steveyegge/beads repository - Quick reference of essential commands - Compelling messaging to encourage adoption
314 lines
9.2 KiB
Go
314 lines
9.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
)
|
|
|
|
// FileWatcher monitors JSONL and git ref changes using filesystem events or polling.
|
|
type FileWatcher struct {
|
|
watcher *fsnotify.Watcher
|
|
debouncer *Debouncer
|
|
jsonlPath string
|
|
parentDir string
|
|
pollingMode bool
|
|
lastModTime time.Time
|
|
lastExists bool
|
|
lastSize int64
|
|
pollInterval time.Duration
|
|
gitRefsPath string
|
|
gitHeadPath string
|
|
lastHeadModTime time.Time
|
|
lastHeadExists bool
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup // Track goroutines for graceful shutdown (bd-jo38)
|
|
}
|
|
|
|
// NewFileWatcher creates a file watcher for the given JSONL path.
|
|
// onChanged is called when the file or git refs change, after debouncing.
|
|
// Falls back to polling mode if fsnotify fails (controlled by BEADS_WATCHER_FALLBACK env var).
|
|
func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
|
|
fw := &FileWatcher{
|
|
jsonlPath: jsonlPath,
|
|
parentDir: filepath.Dir(jsonlPath),
|
|
debouncer: NewDebouncer(500*time.Millisecond, onChanged),
|
|
pollInterval: 5 * time.Second,
|
|
}
|
|
|
|
// Get initial file state for polling fallback
|
|
if stat, err := os.Stat(jsonlPath); err == nil {
|
|
fw.lastModTime = stat.ModTime()
|
|
fw.lastExists = true
|
|
fw.lastSize = stat.Size()
|
|
}
|
|
|
|
// Check if fallback is disabled
|
|
fallbackEnv := os.Getenv("BEADS_WATCHER_FALLBACK")
|
|
fallbackDisabled := fallbackEnv == "false" || fallbackEnv == "0"
|
|
|
|
// Store git paths for filtering
|
|
gitDir := filepath.Join(fw.parentDir, "..", ".git")
|
|
fw.gitRefsPath = filepath.Join(gitDir, "refs", "heads")
|
|
fw.gitHeadPath = filepath.Join(gitDir, "HEAD")
|
|
|
|
// Get initial git HEAD state for polling
|
|
if stat, err := os.Stat(fw.gitHeadPath); err == nil {
|
|
fw.lastHeadModTime = stat.ModTime()
|
|
fw.lastHeadExists = true
|
|
}
|
|
|
|
watcher, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
if fallbackDisabled {
|
|
return nil, fmt.Errorf("fsnotify.NewWatcher() failed and BEADS_WATCHER_FALLBACK is disabled: %w", err)
|
|
}
|
|
// Fall back to polling mode
|
|
fmt.Fprintf(os.Stderr, "Warning: fsnotify.NewWatcher() failed (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
|
|
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
|
|
fw.pollingMode = true
|
|
return fw, nil
|
|
}
|
|
|
|
fw.watcher = watcher
|
|
|
|
// Watch the parent directory (catches creates/renames)
|
|
if err := watcher.Add(fw.parentDir); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Warning: failed to watch parent directory %s: %v\n", fw.parentDir, err)
|
|
}
|
|
|
|
// Watch the JSONL file (may not exist yet)
|
|
if err := watcher.Add(jsonlPath); err != nil {
|
|
if os.IsNotExist(err) {
|
|
// File doesn't exist yet - rely on parent dir watch
|
|
fmt.Fprintf(os.Stderr, "Info: JSONL file %s doesn't exist yet, watching parent directory\n", jsonlPath)
|
|
} else {
|
|
_ = watcher.Close()
|
|
if fallbackDisabled {
|
|
return nil, fmt.Errorf("failed to watch JSONL and BEADS_WATCHER_FALLBACK is disabled: %w", err)
|
|
}
|
|
// Fall back to polling mode
|
|
fmt.Fprintf(os.Stderr, "Warning: failed to watch JSONL (%v), falling back to polling mode (%v interval)\n", err, fw.pollInterval)
|
|
fmt.Fprintf(os.Stderr, "Set BEADS_WATCHER_FALLBACK=false to disable this fallback and require fsnotify\n")
|
|
fw.pollingMode = true
|
|
fw.watcher = nil
|
|
return fw, nil
|
|
}
|
|
}
|
|
|
|
// Also watch .git/refs/heads and .git/HEAD for branch changes (best effort)
|
|
_ = watcher.Add(fw.gitRefsPath) // Ignore error - not all setups have this
|
|
_ = watcher.Add(fw.gitHeadPath) // Ignore error - not all setups have this
|
|
|
|
return fw, nil
|
|
}
|
|
|
|
// Start begins monitoring filesystem events or polling.
|
|
// Runs in background goroutine until context is canceled.
|
|
// Should only be called once per FileWatcher instance.
|
|
func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
|
|
// Create internal cancel so Close can stop goroutines
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
fw.cancel = cancel
|
|
|
|
if fw.pollingMode {
|
|
fw.startPolling(ctx, log)
|
|
return
|
|
}
|
|
|
|
fw.wg.Add(1)
|
|
go func() {
|
|
defer fw.wg.Done()
|
|
jsonlBase := filepath.Base(fw.jsonlPath)
|
|
|
|
for {
|
|
select {
|
|
case event, ok := <-fw.watcher.Events:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Handle parent directory events (file create/replace)
|
|
if event.Name == filepath.Join(fw.parentDir, jsonlBase) && event.Op&fsnotify.Create != 0 {
|
|
log.log("JSONL file created: %s", event.Name)
|
|
// Ensure we're watching the file directly
|
|
_ = fw.watcher.Add(fw.jsonlPath)
|
|
fw.debouncer.Trigger()
|
|
continue
|
|
}
|
|
|
|
// Handle JSONL write/chmod events
|
|
if event.Name == fw.jsonlPath && event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Chmod) != 0 {
|
|
log.log("File change detected: %s (op: %v)", event.Name, event.Op)
|
|
fw.debouncer.Trigger()
|
|
continue
|
|
}
|
|
|
|
// Handle JSONL removal/rename (e.g., git checkout)
|
|
if event.Name == fw.jsonlPath && (event.Op&fsnotify.Remove != 0 || event.Op&fsnotify.Rename != 0) {
|
|
log.log("JSONL removed/renamed, re-establishing watch")
|
|
_ = fw.watcher.Remove(fw.jsonlPath)
|
|
// Retry with exponential backoff
|
|
fw.reEstablishWatch(ctx, log)
|
|
continue
|
|
}
|
|
|
|
// Handle .git/HEAD changes (branch switches)
|
|
if event.Name == fw.gitHeadPath && event.Op&(fsnotify.Write|fsnotify.Create) != 0 {
|
|
log.log("Git HEAD change detected: %s", event.Name)
|
|
fw.debouncer.Trigger()
|
|
continue
|
|
}
|
|
|
|
// Handle git ref changes (only events under gitRefsPath)
|
|
if event.Op&fsnotify.Write != 0 && strings.HasPrefix(event.Name, fw.gitRefsPath) {
|
|
log.log("Git ref change detected: %s", event.Name)
|
|
fw.debouncer.Trigger()
|
|
continue
|
|
}
|
|
|
|
case err, ok := <-fw.watcher.Errors:
|
|
if !ok {
|
|
return
|
|
}
|
|
log.log("Watcher error: %v", err)
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// reEstablishWatch attempts to re-add the JSONL watch with exponential backoff.
|
|
func (fw *FileWatcher) reEstablishWatch(ctx context.Context, log daemonLogger) {
|
|
delays := []time.Duration{50 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond, 400 * time.Millisecond}
|
|
|
|
for _, delay := range delays {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(delay):
|
|
if err := fw.watcher.Add(fw.jsonlPath); err != nil {
|
|
if os.IsNotExist(err) {
|
|
log.log("JSONL still missing after %v, retrying...", delay)
|
|
continue
|
|
}
|
|
log.log("Failed to re-watch JSONL after %v: %v", delay, err)
|
|
return
|
|
}
|
|
// Success!
|
|
log.log("Successfully re-established JSONL watch after %v", delay)
|
|
fw.debouncer.Trigger()
|
|
return
|
|
}
|
|
}
|
|
log.log("Failed to re-establish JSONL watch after all retries")
|
|
}
|
|
|
|
// startPolling begins polling for file changes using a ticker.
|
|
func (fw *FileWatcher) startPolling(ctx context.Context, log daemonLogger) {
|
|
log.log("Starting polling mode with %v interval", fw.pollInterval)
|
|
ticker := time.NewTicker(fw.pollInterval)
|
|
fw.wg.Add(1)
|
|
go func() {
|
|
defer fw.wg.Done()
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
changed := false
|
|
|
|
// Check JSONL file
|
|
stat, err := os.Stat(fw.jsonlPath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
// File disappeared
|
|
if fw.lastExists {
|
|
fw.lastExists = false
|
|
fw.lastModTime = time.Time{}
|
|
fw.lastSize = 0
|
|
log.log("File missing (polling): %s", fw.jsonlPath)
|
|
changed = true
|
|
}
|
|
} else {
|
|
log.log("Polling error: %v", err)
|
|
}
|
|
} else {
|
|
// File exists
|
|
if !fw.lastExists {
|
|
// File appeared
|
|
fw.lastExists = true
|
|
fw.lastModTime = stat.ModTime()
|
|
fw.lastSize = stat.Size()
|
|
log.log("File appeared (polling): %s", fw.jsonlPath)
|
|
changed = true
|
|
} else if !stat.ModTime().Equal(fw.lastModTime) || stat.Size() != fw.lastSize {
|
|
// File exists and existed before - check for changes
|
|
fw.lastModTime = stat.ModTime()
|
|
fw.lastSize = stat.Size()
|
|
log.log("File change detected (polling): %s", fw.jsonlPath)
|
|
changed = true
|
|
}
|
|
}
|
|
|
|
// Check .git/HEAD for branch changes
|
|
headStat, err := os.Stat(fw.gitHeadPath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
if fw.lastHeadExists {
|
|
fw.lastHeadExists = false
|
|
fw.lastHeadModTime = time.Time{}
|
|
log.log("Git HEAD missing (polling): %s", fw.gitHeadPath)
|
|
changed = true
|
|
}
|
|
}
|
|
// Ignore other errors for HEAD - it's optional
|
|
} else {
|
|
// HEAD exists
|
|
if !fw.lastHeadExists {
|
|
// HEAD appeared
|
|
fw.lastHeadExists = true
|
|
fw.lastHeadModTime = headStat.ModTime()
|
|
log.log("Git HEAD appeared (polling): %s", fw.gitHeadPath)
|
|
changed = true
|
|
} else if !headStat.ModTime().Equal(fw.lastHeadModTime) {
|
|
// HEAD changed (branch switch)
|
|
fw.lastHeadModTime = headStat.ModTime()
|
|
log.log("Git HEAD change detected (polling): %s", fw.gitHeadPath)
|
|
changed = true
|
|
}
|
|
}
|
|
|
|
if changed {
|
|
fw.debouncer.Trigger()
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Close stops the file watcher and releases resources.
|
|
func (fw *FileWatcher) Close() error {
|
|
// Stop background goroutines
|
|
if fw.cancel != nil {
|
|
fw.cancel()
|
|
}
|
|
// Wait for goroutines to finish before cleanup (bd-jo38)
|
|
fw.wg.Wait()
|
|
fw.debouncer.Cancel()
|
|
if fw.watcher != nil {
|
|
return fw.watcher.Close()
|
|
}
|
|
return nil
|
|
}
|