Implement event-driven daemon improvements for bd-85
- Add mutation events for label/dep/comment operations - Create separate export-only and import-only functions - Add dropped events counter with safety net export - Complete bd-80 mutation channel implementation Event-driven mode now: - Emits mutation events for ALL write operations (not just create/update/close) - Uses createExportFunc() for mutations (export+commit/push only, no pull) - Uses createAutoImportFunc() for file changes (pull+import only, no export) - Tracks dropped events and triggers safety export every 60s if any dropped - Achieves <500ms latency target by avoiding full sync on each trigger Behind BEADS_DAEMON_MODE=events flag (poll is still default)
This commit is contained in:
148
cmd/bd/daemon.go
148
cmd/bd/daemon.go
@@ -1007,6 +1007,147 @@ func runGlobalDaemon(log daemonLogger) {
|
||||
log.log("Global daemon stopped")
|
||||
}
|
||||
|
||||
// createExportFunc creates a function that only exports database to JSONL
|
||||
// and optionally commits/pushes (no git pull or import). Used for mutation events.
|
||||
func createExportFunc(ctx context.Context, store storage.Storage, autoCommit, autoPush bool, log daemonLogger) func() {
|
||||
return func() {
|
||||
exportCtx, exportCancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer exportCancel()
|
||||
|
||||
log.log("Starting export...")
|
||||
|
||||
jsonlPath := findJSONLPath()
|
||||
if jsonlPath == "" {
|
||||
log.log("Error: JSONL path not found")
|
||||
return
|
||||
}
|
||||
|
||||
// Check for exclusive lock
|
||||
beadsDir := filepath.Dir(jsonlPath)
|
||||
skip, holder, err := types.ShouldSkipDatabase(beadsDir)
|
||||
if skip {
|
||||
if err != nil {
|
||||
log.log("Skipping export (lock check failed: %v)", err)
|
||||
} else {
|
||||
log.log("Skipping export (locked by %s)", holder)
|
||||
}
|
||||
return
|
||||
}
|
||||
if holder != "" {
|
||||
log.log("Removed stale lock (%s), proceeding", holder)
|
||||
}
|
||||
|
||||
// Pre-export validation
|
||||
if err := validatePreExport(exportCtx, store, jsonlPath); err != nil {
|
||||
log.log("Pre-export validation failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Export to JSONL
|
||||
if err := exportToJSONLWithStore(exportCtx, store, jsonlPath); err != nil {
|
||||
log.log("Export failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.log("Exported to JSONL")
|
||||
|
||||
// Auto-commit if enabled
|
||||
if autoCommit {
|
||||
hasChanges, err := gitHasChanges(exportCtx, jsonlPath)
|
||||
if err != nil {
|
||||
log.log("Error checking git status: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if hasChanges {
|
||||
message := fmt.Sprintf("bd daemon export: %s", time.Now().Format("2006-01-02 15:04:05"))
|
||||
if err := gitCommit(exportCtx, jsonlPath, message); err != nil {
|
||||
log.log("Commit failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.log("Committed changes")
|
||||
|
||||
// Auto-push if enabled
|
||||
if autoPush {
|
||||
if err := gitPush(exportCtx); err != nil {
|
||||
log.log("Push failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.log("Pushed to remote")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.log("Export complete")
|
||||
}
|
||||
}
|
||||
|
||||
// createAutoImportFunc creates a function that pulls from git and imports JSONL
|
||||
// to database (no export). Used for file system change events.
|
||||
func createAutoImportFunc(ctx context.Context, store storage.Storage, log daemonLogger) func() {
|
||||
return func() {
|
||||
importCtx, importCancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer importCancel()
|
||||
|
||||
log.log("Starting auto-import...")
|
||||
|
||||
jsonlPath := findJSONLPath()
|
||||
if jsonlPath == "" {
|
||||
log.log("Error: JSONL path not found")
|
||||
return
|
||||
}
|
||||
|
||||
// Check for exclusive lock
|
||||
beadsDir := filepath.Dir(jsonlPath)
|
||||
skip, holder, err := types.ShouldSkipDatabase(beadsDir)
|
||||
if skip {
|
||||
if err != nil {
|
||||
log.log("Skipping import (lock check failed: %v)", err)
|
||||
} else {
|
||||
log.log("Skipping import (locked by %s)", holder)
|
||||
}
|
||||
return
|
||||
}
|
||||
if holder != "" {
|
||||
log.log("Removed stale lock (%s), proceeding", holder)
|
||||
}
|
||||
|
||||
// Pull from git
|
||||
if err := gitPull(importCtx); err != nil {
|
||||
log.log("Pull failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.log("Pulled from remote")
|
||||
|
||||
// Count issues before import
|
||||
beforeCount, err := countDBIssues(importCtx, store)
|
||||
if err != nil {
|
||||
log.log("Failed to count issues before import: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Import from JSONL
|
||||
if err := importToJSONLWithStore(importCtx, store, jsonlPath); err != nil {
|
||||
log.log("Import failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.log("Imported from JSONL")
|
||||
|
||||
// Validate import
|
||||
afterCount, err := countDBIssues(importCtx, store)
|
||||
if err != nil {
|
||||
log.log("Failed to count issues after import: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := validatePostImport(beforeCount, afterCount); err != nil {
|
||||
log.log("Post-import validation failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.log("Auto-import complete")
|
||||
}
|
||||
}
|
||||
|
||||
func createSyncFunc(ctx context.Context, store storage.Storage, autoCommit, autoPush bool, log daemonLogger) func() {
|
||||
return func() {
|
||||
syncCtx, syncCancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||
@@ -1308,15 +1449,16 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p
|
||||
switch daemonMode {
|
||||
case "events":
|
||||
log.log("Using event-driven mode")
|
||||
// For Phase 1: event-driven mode uses full sync on both export and import events
|
||||
// TODO: Optimize to separate export-only and import-only triggers
|
||||
jsonlPath := findJSONLPath()
|
||||
if jsonlPath == "" {
|
||||
log.log("Error: JSONL path not found, cannot use event-driven mode")
|
||||
log.log("Falling back to polling mode")
|
||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
||||
} else {
|
||||
runEventDrivenLoop(ctx, cancel, server, serverErrChan, store, jsonlPath, doSync, doSync, log)
|
||||
// Event-driven mode uses separate export-only and import-only functions
|
||||
doExport := createExportFunc(ctx, store, autoCommit, autoPush, log)
|
||||
doAutoImport := createAutoImportFunc(ctx, store, log)
|
||||
runEventDrivenLoop(ctx, cancel, server, serverErrChan, store, jsonlPath, doExport, doAutoImport, log)
|
||||
}
|
||||
case "poll":
|
||||
log.log("Using polling mode (interval: %v)", interval)
|
||||
|
||||
@@ -70,7 +70,7 @@ func runEventDrivenLoop(
|
||||
}
|
||||
}()
|
||||
|
||||
// Optional: Periodic health check (not a sync poll)
|
||||
// Optional: Periodic health check and dropped events safety net
|
||||
healthTicker := time.NewTicker(60 * time.Second)
|
||||
defer healthTicker.Stop()
|
||||
|
||||
@@ -79,6 +79,13 @@ func runEventDrivenLoop(
|
||||
case <-healthTicker.C:
|
||||
// Periodic health validation (not sync)
|
||||
checkDaemonHealth(ctx, store, log)
|
||||
|
||||
// Safety net: check for dropped mutation events
|
||||
dropped := server.ResetDroppedEventsCount()
|
||||
if dropped > 0 {
|
||||
log.log("WARNING: %d mutation events were dropped, triggering export", dropped)
|
||||
exportDebouncer.Trigger()
|
||||
}
|
||||
|
||||
case sig := <-sigChan:
|
||||
if isReloadSignal(sig) {
|
||||
|
||||
Reference in New Issue
Block a user