Files
beads/cmd/bd/daemon_event_loop.go
Steve Yegge fea86f9b31 Implement event-driven daemon improvements for bd-85
- Add mutation events for label/dep/comment operations
- Create separate export-only and import-only functions
- Add dropped events counter with safety net export
- Complete bd-80 mutation channel implementation

Event-driven mode now:
- Emits mutation events for ALL write operations (not just create/update/close)
- Uses createExportFunc() for mutations (export+commit/push only, no pull)
- Uses createAutoImportFunc() for file changes (pull+import only, no export)
- Tracks dropped events and triggers safety export every 60s if any dropped
- Achieves <500ms latency target by avoiding full sync on each trigger

Behind BEADS_DAEMON_MODE=events flag (poll is still default)
2025-10-29 11:22:29 -07:00

132 lines
3.2 KiB
Go

package main
import (
"context"
"os"
"os/signal"
"time"
"github.com/steveyegge/beads/internal/rpc"
"github.com/steveyegge/beads/internal/storage"
)
// runEventDrivenLoop implements event-driven daemon architecture.
// Replaces polling ticker with reactive event handlers for:
// - File system changes (JSONL modifications)
// - RPC mutations (create, update, delete)
// - Git operations (via hooks, optional)
func runEventDrivenLoop(
ctx context.Context,
cancel context.CancelFunc,
server *rpc.Server,
serverErrChan chan error,
store storage.Storage,
jsonlPath string,
doExport func(),
doAutoImport func(),
log daemonLogger,
) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, daemonSignals...)
defer signal.Stop(sigChan)
// Debounced sync actions
exportDebouncer := NewDebouncer(500*time.Millisecond, func() {
log.log("Export triggered by mutation events")
doExport()
})
defer exportDebouncer.Cancel()
importDebouncer := NewDebouncer(500*time.Millisecond, func() {
log.log("Import triggered by file change")
doAutoImport()
})
defer importDebouncer.Cancel()
// Start file watcher for JSONL changes
watcher, err := NewFileWatcher(jsonlPath, func() {
importDebouncer.Trigger()
})
if err != nil {
log.log("WARNING: File watcher unavailable (%v), mutations will trigger export only", err)
watcher = nil
} else {
watcher.Start(ctx, log)
defer watcher.Close()
}
// Handle mutation events from RPC server
mutationChan := server.MutationChan()
go func() {
for {
select {
case event := <-mutationChan:
log.log("Mutation detected: %s %s", event.Type, event.IssueID)
exportDebouncer.Trigger()
case <-ctx.Done():
return
}
}
}()
// Optional: Periodic health check and dropped events safety net
healthTicker := time.NewTicker(60 * time.Second)
defer healthTicker.Stop()
for {
select {
case <-healthTicker.C:
// Periodic health validation (not sync)
checkDaemonHealth(ctx, store, log)
// Safety net: check for dropped mutation events
dropped := server.ResetDroppedEventsCount()
if dropped > 0 {
log.log("WARNING: %d mutation events were dropped, triggering export", dropped)
exportDebouncer.Trigger()
}
case sig := <-sigChan:
if isReloadSignal(sig) {
log.log("Received reload signal, ignoring")
continue
}
log.log("Received signal %v, shutting down...", sig)
cancel()
if err := server.Stop(); err != nil {
log.log("Error stopping server: %v", err)
}
return
case <-ctx.Done():
log.log("Context canceled, shutting down")
if watcher != nil {
watcher.Close()
}
if err := server.Stop(); err != nil {
log.log("Error stopping server: %v", err)
}
return
case err := <-serverErrChan:
log.log("RPC server failed: %v", err)
cancel()
if watcher != nil {
watcher.Close()
}
return
}
}
}
// checkDaemonHealth performs periodic health validation.
// Separate from sync operations - just validates state.
func checkDaemonHealth(ctx context.Context, store storage.Storage, log daemonLogger) {
// TODO: Add health checks:
// - Database integrity check
// - Disk space check
// - Memory usage check
// For now, this is a no-op placeholder
}