Add event-driven daemon architecture (Phase 1 foundation)
- Add fsnotify dependency for file watching - Create daemon_debouncer.go: batch rapid events (500ms window) - Create daemon_watcher.go: monitor JSONL and git refs changes - Create daemon_event_loop.go: event-driven sync loop - Add mutation channel to RPC server (create/update/close events) - Add BEADS_DAEMON_MODE env var (poll/events, default: poll) Phase 1 implementation: opt-in via BEADS_DAEMON_MODE=events Target: <500ms latency (vs 5000ms), ~60% CPU reduction Related: bd-49 (epic), bd-50, bd-51, bd-53, bd-54, bd-55, bd-56 Amp-Thread-ID: https://ampcode.com/threads/T-35a3d0d7-4e19-421d-8392-63755035036e Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -1299,5 +1299,30 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p
|
||||
doSync := createSyncFunc(ctx, store, autoCommit, autoPush, log)
|
||||
doSync()
|
||||
|
||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
||||
// Choose event loop based on BEADS_DAEMON_MODE
|
||||
daemonMode := os.Getenv("BEADS_DAEMON_MODE")
|
||||
if daemonMode == "" {
|
||||
daemonMode = "poll" // Default to polling for Phase 1
|
||||
}
|
||||
|
||||
switch daemonMode {
|
||||
case "events":
|
||||
log.log("Using event-driven mode")
|
||||
// For Phase 1: event-driven mode uses full sync on both export and import events
|
||||
// TODO: Optimize to separate export-only and import-only triggers
|
||||
jsonlPath := findJSONLPath()
|
||||
if jsonlPath == "" {
|
||||
log.log("Error: JSONL path not found, cannot use event-driven mode")
|
||||
log.log("Falling back to polling mode")
|
||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
||||
} else {
|
||||
runEventDrivenLoop(ctx, cancel, server, serverErrChan, store, jsonlPath, doSync, doSync, log)
|
||||
}
|
||||
case "poll":
|
||||
log.log("Using polling mode (interval: %v)", interval)
|
||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
||||
default:
|
||||
log.log("Unknown BEADS_DAEMON_MODE: %s (valid: poll, events), defaulting to poll", daemonMode)
|
||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
||||
}
|
||||
}
|
||||
|
||||
55
cmd/bd/daemon_debouncer.go
Normal file
55
cmd/bd/daemon_debouncer.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Debouncer batches rapid events into a single action after a quiet period.
|
||||
// Thread-safe for concurrent triggers.
|
||||
type Debouncer struct {
|
||||
mu sync.Mutex
|
||||
timer *time.Timer
|
||||
duration time.Duration
|
||||
action func()
|
||||
}
|
||||
|
||||
// NewDebouncer creates a new debouncer with the given duration and action.
|
||||
// The action will be called once after the duration has passed since the last trigger.
|
||||
func NewDebouncer(duration time.Duration, action func()) *Debouncer {
|
||||
return &Debouncer{
|
||||
duration: duration,
|
||||
action: action,
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger schedules the action to run after the debounce duration.
|
||||
// If called multiple times, the timer is reset each time, ensuring
|
||||
// the action only fires once after the last trigger.
|
||||
func (d *Debouncer) Trigger() {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
if d.timer != nil {
|
||||
d.timer.Stop()
|
||||
}
|
||||
|
||||
d.timer = time.AfterFunc(d.duration, func() {
|
||||
d.action()
|
||||
d.mu.Lock()
|
||||
d.timer = nil
|
||||
d.mu.Unlock()
|
||||
})
|
||||
}
|
||||
|
||||
// Cancel stops any pending debounced action.
|
||||
// Safe to call even if no action is pending.
|
||||
func (d *Debouncer) Cancel() {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
if d.timer != nil {
|
||||
d.timer.Stop()
|
||||
d.timer = nil
|
||||
}
|
||||
}
|
||||
124
cmd/bd/daemon_event_loop.go
Normal file
124
cmd/bd/daemon_event_loop.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/rpc"
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
)
|
||||
|
||||
// runEventDrivenLoop implements event-driven daemon architecture.
|
||||
// Replaces polling ticker with reactive event handlers for:
|
||||
// - File system changes (JSONL modifications)
|
||||
// - RPC mutations (create, update, delete)
|
||||
// - Git operations (via hooks, optional)
|
||||
func runEventDrivenLoop(
|
||||
ctx context.Context,
|
||||
cancel context.CancelFunc,
|
||||
server *rpc.Server,
|
||||
serverErrChan chan error,
|
||||
store storage.Storage,
|
||||
jsonlPath string,
|
||||
doExport func(),
|
||||
doAutoImport func(),
|
||||
log daemonLogger,
|
||||
) {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, daemonSignals...)
|
||||
defer signal.Stop(sigChan)
|
||||
|
||||
// Debounced sync actions
|
||||
exportDebouncer := NewDebouncer(500*time.Millisecond, func() {
|
||||
log.log("Export triggered by mutation events")
|
||||
doExport()
|
||||
})
|
||||
defer exportDebouncer.Cancel()
|
||||
|
||||
importDebouncer := NewDebouncer(500*time.Millisecond, func() {
|
||||
log.log("Import triggered by file change")
|
||||
doAutoImport()
|
||||
})
|
||||
defer importDebouncer.Cancel()
|
||||
|
||||
// Start file watcher for JSONL changes
|
||||
watcher, err := NewFileWatcher(jsonlPath, func() {
|
||||
importDebouncer.Trigger()
|
||||
})
|
||||
if err != nil {
|
||||
log.log("WARNING: File watcher unavailable (%v), mutations will trigger export only", err)
|
||||
watcher = nil
|
||||
} else {
|
||||
watcher.Start(ctx, log)
|
||||
defer watcher.Close()
|
||||
}
|
||||
|
||||
// Handle mutation events from RPC server
|
||||
mutationChan := server.MutationChan()
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case event := <-mutationChan:
|
||||
log.log("Mutation detected: %s %s", event.Type, event.IssueID)
|
||||
exportDebouncer.Trigger()
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Optional: Periodic health check (not a sync poll)
|
||||
healthTicker := time.NewTicker(60 * time.Second)
|
||||
defer healthTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-healthTicker.C:
|
||||
// Periodic health validation (not sync)
|
||||
checkDaemonHealth(ctx, store, log)
|
||||
|
||||
case sig := <-sigChan:
|
||||
if isReloadSignal(sig) {
|
||||
log.log("Received reload signal, ignoring")
|
||||
continue
|
||||
}
|
||||
log.log("Received signal %v, shutting down...", sig)
|
||||
cancel()
|
||||
if err := server.Stop(); err != nil {
|
||||
log.log("Error stopping server: %v", err)
|
||||
}
|
||||
return
|
||||
|
||||
case <-ctx.Done():
|
||||
log.log("Context canceled, shutting down")
|
||||
if watcher != nil {
|
||||
watcher.Close()
|
||||
}
|
||||
if err := server.Stop(); err != nil {
|
||||
log.log("Error stopping server: %v", err)
|
||||
}
|
||||
return
|
||||
|
||||
case err := <-serverErrChan:
|
||||
log.log("RPC server failed: %v", err)
|
||||
cancel()
|
||||
if watcher != nil {
|
||||
watcher.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkDaemonHealth performs periodic health validation.
|
||||
// Separate from sync operations - just validates state.
|
||||
func checkDaemonHealth(ctx context.Context, store storage.Storage, log daemonLogger) {
|
||||
// TODO: Add health checks:
|
||||
// - Database integrity check
|
||||
// - Disk space check
|
||||
// - Memory usage check
|
||||
// For now, this is a no-op placeholder
|
||||
}
|
||||
97
cmd/bd/daemon_watcher.go
Normal file
97
cmd/bd/daemon_watcher.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
// FileWatcher monitors JSONL and git ref changes using filesystem events.
|
||||
type FileWatcher struct {
|
||||
watcher *fsnotify.Watcher
|
||||
debouncer *Debouncer
|
||||
jsonlPath string
|
||||
}
|
||||
|
||||
// NewFileWatcher creates a file watcher for the given JSONL path.
|
||||
// onChanged is called when the file or git refs change, after debouncing.
|
||||
func NewFileWatcher(jsonlPath string, onChanged func()) (*FileWatcher, error) {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fw := &FileWatcher{
|
||||
watcher: watcher,
|
||||
jsonlPath: jsonlPath,
|
||||
debouncer: NewDebouncer(500*time.Millisecond, onChanged),
|
||||
}
|
||||
|
||||
// Watch the JSONL file
|
||||
if err := watcher.Add(jsonlPath); err != nil {
|
||||
watcher.Close()
|
||||
return nil, fmt.Errorf("failed to watch JSONL: %w", err)
|
||||
}
|
||||
|
||||
// Also watch .git/refs/heads for branch changes (best effort)
|
||||
gitRefsPath := filepath.Join(filepath.Dir(jsonlPath), "..", ".git", "refs", "heads")
|
||||
_ = watcher.Add(gitRefsPath) // Ignore error - not all setups have this
|
||||
|
||||
return fw, nil
|
||||
}
|
||||
|
||||
// Start begins monitoring filesystem events.
|
||||
// Runs in background goroutine until context is canceled.
|
||||
func (fw *FileWatcher) Start(ctx context.Context, log daemonLogger) {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-fw.watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Handle JSONL write events
|
||||
if event.Name == fw.jsonlPath && event.Op&fsnotify.Write != 0 {
|
||||
log.log("File change detected: %s", event.Name)
|
||||
fw.debouncer.Trigger()
|
||||
}
|
||||
|
||||
// Handle JSONL removal/rename (e.g., git checkout)
|
||||
if event.Name == fw.jsonlPath && (event.Op&fsnotify.Remove != 0 || event.Op&fsnotify.Rename != 0) {
|
||||
log.log("JSONL removed/renamed, re-establishing watch")
|
||||
fw.watcher.Remove(fw.jsonlPath)
|
||||
// Brief wait for file to be recreated
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if err := fw.watcher.Add(fw.jsonlPath); err != nil {
|
||||
log.log("Failed to re-watch JSONL: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle git ref changes
|
||||
if event.Op&fsnotify.Write != 0 && filepath.Dir(event.Name) != filepath.Dir(fw.jsonlPath) {
|
||||
log.log("Git ref change detected: %s", event.Name)
|
||||
fw.debouncer.Trigger()
|
||||
}
|
||||
|
||||
case err, ok := <-fw.watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.log("Watcher error: %v", err)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Close stops the file watcher and releases resources.
|
||||
func (fw *FileWatcher) Close() error {
|
||||
fw.debouncer.Cancel()
|
||||
return fw.watcher.Close()
|
||||
}
|
||||
Reference in New Issue
Block a user