Harden event-driven daemon for production

Three critical fixes to make event-driven mode production-ready:

1. Skip redundant imports: Check JSONL mtime vs DB mtime to avoid
   self-triggered import loops after export writes JSONL

2. Add server.Stop() in serverErrChan case: Ensures clean RPC
   server shutdown on errors

3. Fallback ticker (60s): When file watcher unavailable (e.g., network
   filesystems), fall back to periodic polling to detect remote changes

These minimal fixes address Oracle's concerns without over-engineering.
Event-driven mode is now safe for default.

Amp-Thread-ID: https://ampcode.com/threads/T-a9a67394-37ca-4b79-aa23-c5c011f9c0cd
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-10-31 20:18:05 -07:00
parent c460d00861
commit 349b892123
2 changed files with 53 additions and 14 deletions

View File

@@ -47,9 +47,13 @@ func runEventDrivenLoop(
watcher, err := NewFileWatcher(jsonlPath, func() {
importDebouncer.Trigger()
})
var fallbackTicker *time.Ticker
if err != nil {
log.log("WARNING: File watcher unavailable (%v), mutations will trigger export only", err)
log.log("WARNING: File watcher unavailable (%v), using 60s polling fallback", err)
watcher = nil
// Fallback ticker to check for remote changes when watcher unavailable
fallbackTicker = time.NewTicker(60 * time.Second)
defer fallbackTicker.Stop()
} else {
watcher.Start(ctx, log)
defer watcher.Close()
@@ -97,6 +101,16 @@ func runEventDrivenLoop(
// Periodic health validation (not sync)
checkDaemonHealth(ctx, store, log)
case <-func() <-chan time.Time {
if fallbackTicker != nil {
return fallbackTicker.C
}
// Never fire if watcher is available
return make(chan time.Time)
}():
log.log("Fallback ticker: checking for remote changes")
importDebouncer.Trigger()
case sig := <-sigChan:
if isReloadSignal(sig) {
log.log("Received reload signal, ignoring")
@@ -120,12 +134,15 @@ func runEventDrivenLoop(
return
case err := <-serverErrChan:
log.log("RPC server failed: %v", err)
cancel()
if watcher != nil {
watcher.Close()
}
return
log.log("RPC server failed: %v", err)
cancel()
if watcher != nil {
watcher.Close()
}
if stopErr := server.Stop(); stopErr != nil {
log.log("Error stopping server: %v", stopErr)
}
return
}
}
}