Files
beads/cmd/bd/daemon_event_loop.go
Charles P. Cross 737e65afbd fix(daemon): add periodic remote sync to event-driven mode (#698)
* fix(daemon): add periodic remote sync to event-driven mode

The event-driven daemon mode only triggered imports when the local JSONL
file changed (via file watcher) or when the fallback ticker fired (only
if watcher failed). This meant the daemon wouldn't see updates pushed
by other clones until something triggered a local file change.

Bug scenario:
1. Clone A creates an issue and daemon pushes to sync branch
2. Clone B's daemon only watched local file changes
3. Clone B would not see the new issue until something triggered local change
4. With this fix: Clone B's daemon periodically calls doAutoImport

This fix adds a 30-second periodic remote sync ticker that calls
doAutoImport(), which includes syncBranchPull() to fetch and import
updates from the remote sync branch.

This is essential for multi-clone workflows where:
- Clone A creates an issue and daemon pushes to sync branch
- Clone B's daemon needs to periodically pull to see the new issue
- Without periodic sync, Clone B would only see updates if its local
  JSONL file happened to change

The 30-second interval balances responsiveness with network overhead.

Adds integration test TestEventDrivenLoop_PeriodicRemoteSync that
verifies the event-driven loop starts with periodic sync support.

* feat(daemon): add configurable interval for periodic remote sync

- Add BEADS_REMOTE_SYNC_INTERVAL environment variable to configure
  the interval for periodic remote sync (default: 30s)
- Add getRemoteSyncInterval() function to parse the env var
- Minimum interval is 5s to prevent excessive load
- Setting to 0 disables periodic sync (not recommended)
- Add comprehensive integration tests for the configuration

Valid duration formats:
- "30s" (30 seconds)
- "1m" (1 minute)
- "5m" (5 minutes)

Tests added:
- TestEventDrivenLoop_HasRemoteSyncTicker
- TestGetRemoteSyncInterval_Default
- TestGetRemoteSyncInterval_CustomValue
- TestGetRemoteSyncInterval_MinimumEnforced
- TestGetRemoteSyncInterval_InvalidValue
- TestGetRemoteSyncInterval_Zero
- TestSyncBranchPull_FetchesRemoteUpdates

* fix: resolve all golangci-lint errors (cherry-pick from fix/linting-errors)

Cherry-picked linting fixes to ensure CI passes.

* feat(daemon): add config.yaml support for remote-sync-interval

- Add remote-sync-interval to .beads/config.yaml as alternative to
  BEADS_REMOTE_SYNC_INTERVAL environment variable
- Environment variable takes precedence over config.yaml (follows
  existing pattern for flush-debounce)
- Add config binding in internal/config/config.go
- Update getRemoteSyncInterval() to use config.GetDuration()
- Add doctor validation for remote-sync-interval in config.yaml

Configuration sources (in order of precedence):
1. BEADS_REMOTE_SYNC_INTERVAL environment variable
2. remote-sync-interval in .beads/config.yaml
3. DefaultRemoteSyncInterval (30s)

Example config.yaml:
  remote-sync-interval: "1m"

---------

Co-authored-by: Charles P. Cross <cpdata@users.noreply.github.com>
2025-12-22 14:15:33 -08:00

290 lines
9.1 KiB
Go

package main
import (
"context"
"os"
"os/signal"
"runtime"
"time"
"github.com/steveyegge/beads/internal/config"
"github.com/steveyegge/beads/internal/rpc"
"github.com/steveyegge/beads/internal/storage"
)
// DefaultRemoteSyncInterval is the default interval for periodic remote sync.
// Can be overridden via BEADS_REMOTE_SYNC_INTERVAL environment variable.
const DefaultRemoteSyncInterval = 30 * time.Second
// runEventDrivenLoop implements event-driven daemon architecture.
// Replaces polling ticker with reactive event handlers for:
// - File system changes (JSONL modifications)
// - RPC mutations (create, update, delete)
// - Git operations (via hooks, optional)
// - Parent process monitoring (exit if parent dies)
// - Periodic remote sync (to pull updates from other clones)
//
// The remoteSyncInterval parameter controls how often the daemon pulls from
// remote to check for updates from other clones. Use DefaultRemoteSyncInterval
// or configure via BEADS_REMOTE_SYNC_INTERVAL environment variable.
func runEventDrivenLoop(
ctx context.Context,
cancel context.CancelFunc,
server *rpc.Server,
serverErrChan chan error,
store storage.Storage,
jsonlPath string,
doExport func(),
doAutoImport func(),
parentPID int,
log daemonLogger,
) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, daemonSignals...)
defer signal.Stop(sigChan)
// Debounced sync actions
exportDebouncer := NewDebouncer(500*time.Millisecond, func() {
log.log("Export triggered by mutation events")
doExport()
})
defer exportDebouncer.Cancel()
importDebouncer := NewDebouncer(500*time.Millisecond, func() {
log.log("Import triggered by file change")
doAutoImport()
})
defer importDebouncer.Cancel()
// Start file watcher for JSONL changes
watcher, err := NewFileWatcher(jsonlPath, func() {
importDebouncer.Trigger()
})
var fallbackTicker *time.Ticker
if err != nil {
log.log("WARNING: File watcher unavailable (%v), using 60s polling fallback", err)
watcher = nil
// Fallback ticker to check for remote changes when watcher unavailable
fallbackTicker = time.NewTicker(60 * time.Second)
defer fallbackTicker.Stop()
} else {
watcher.Start(ctx, log)
defer func() { _ = watcher.Close() }()
}
// Handle mutation events from RPC server
mutationChan := server.MutationChan()
go func() {
for {
select {
case event, ok := <-mutationChan:
if !ok {
// Channel closed (should never happen, but handle defensively)
log.log("Mutation channel closed; exiting listener")
return
}
log.log("Mutation detected: %s %s", event.Type, event.IssueID)
exportDebouncer.Trigger()
case <-ctx.Done():
return
}
}
}()
// Periodic health check
healthTicker := time.NewTicker(60 * time.Second)
defer healthTicker.Stop()
// Periodic remote sync to pull updates from other clones
// This is essential for multi-clone workflows where the file watcher only
// sees local changes but remote may have updates from other clones.
// Default is 30 seconds; configurable via BEADS_REMOTE_SYNC_INTERVAL.
remoteSyncInterval := getRemoteSyncInterval(log)
remoteSyncTicker := time.NewTicker(remoteSyncInterval)
defer remoteSyncTicker.Stop()
// Parent process check (every 10 seconds)
parentCheckTicker := time.NewTicker(10 * time.Second)
defer parentCheckTicker.Stop()
// Dropped events safety net (faster recovery than health check)
droppedEventsTicker := time.NewTicker(1 * time.Second)
defer droppedEventsTicker.Stop()
for {
select {
case <-droppedEventsTicker.C:
// Check for dropped mutation events every second
dropped := server.ResetDroppedEventsCount()
if dropped > 0 {
log.log("WARNING: %d mutation events were dropped, triggering export", dropped)
exportDebouncer.Trigger()
}
case <-healthTicker.C:
// Periodic health validation (not sync)
checkDaemonHealth(ctx, store, log)
case <-remoteSyncTicker.C:
// Periodic remote sync to pull updates from other clones
// This ensures the daemon sees changes pushed by other clones
// even when the local file watcher doesn't trigger
log.log("Periodic remote sync: checking for updates")
doAutoImport()
case <-parentCheckTicker.C:
// Check if parent process is still alive
if !checkParentProcessAlive(parentPID) {
log.log("Parent process (PID %d) died, shutting down daemon", parentPID)
cancel()
if err := server.Stop(); err != nil {
log.log("Error stopping server: %v", err)
}
return
}
case <-func() <-chan time.Time {
if fallbackTicker != nil {
return fallbackTicker.C
}
// Never fire if watcher is available
return make(chan time.Time)
}():
log.log("Fallback ticker: checking for remote changes")
importDebouncer.Trigger()
case sig := <-sigChan:
if isReloadSignal(sig) {
log.log("Received reload signal, ignoring")
continue
}
log.log("Received signal %v, shutting down...", sig)
cancel()
if err := server.Stop(); err != nil {
log.log("Error stopping server: %v", err)
}
return
case <-ctx.Done():
log.log("Context canceled, shutting down")
if watcher != nil {
_ = watcher.Close()
}
if err := server.Stop(); err != nil {
log.log("Error stopping server: %v", err)
}
return
case err := <-serverErrChan:
log.log("RPC server failed: %v", err)
cancel()
if watcher != nil {
_ = watcher.Close()
}
if stopErr := server.Stop(); stopErr != nil {
log.log("Error stopping server: %v", stopErr)
}
return
}
}
}
// checkDaemonHealth performs periodic health validation.
// Separate from sync operations - just validates state.
//
// Implements bd-e0o: Phase 3 daemon robustness for GH #353
// Implements bd-gqo: Additional health checks
func checkDaemonHealth(ctx context.Context, store storage.Storage, log daemonLogger) {
// Health check 1: Verify metadata is accessible
// This helps detect if external operations (like bd import --force) have modified metadata
// Without this, daemon may continue operating with stale metadata cache
// Try new key first, fall back to old for migration (bd-39o)
if _, err := store.GetMetadata(ctx, "jsonl_content_hash"); err != nil {
if _, err := store.GetMetadata(ctx, "last_import_hash"); err != nil {
log.log("Health check: metadata read failed: %v", err)
// Non-fatal: daemon continues but logs the issue
// This helps diagnose stuck states in sandboxed environments
}
}
// Health check 2: Database integrity check
// Verify the database is accessible and structurally sound
if db := store.UnderlyingDB(); db != nil {
// Quick integrity check - just verify we can query
var result string
if err := db.QueryRowContext(ctx, "PRAGMA quick_check(1)").Scan(&result); err != nil {
log.log("Health check: database integrity check failed: %v", err)
} else if result != "ok" {
log.log("Health check: database integrity issue: %s", result)
}
}
// Health check 3: Disk space check (platform-specific)
// Uses checkDiskSpace helper which is implemented per-platform
dbPath := store.Path()
if dbPath != "" {
if availableMB, ok := checkDiskSpace(dbPath); ok {
// Warn if less than 100MB available
if availableMB < 100 {
log.log("Health check: low disk space warning: %dMB available", availableMB)
}
}
}
// Health check 4: Memory usage check
// Log warning if memory usage is unusually high
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
heapMB := memStats.HeapAlloc / (1024 * 1024)
// Warn if heap exceeds 500MB (daemon should be lightweight)
if heapMB > 500 {
log.log("Health check: high memory usage warning: %dMB heap allocated", heapMB)
}
}
// getRemoteSyncInterval returns the interval for periodic remote sync.
// Configuration sources (in order of precedence):
// 1. BEADS_REMOTE_SYNC_INTERVAL environment variable
// 2. remote-sync-interval in .beads/config.yaml
// 3. DefaultRemoteSyncInterval (30s)
//
// Accepts Go duration strings like:
// - "30s" (30 seconds)
// - "1m" (1 minute)
// - "5m" (5 minutes)
// - "0" or "0s" (disables periodic sync - use with caution)
//
// Minimum allowed value is 5 seconds to prevent excessive load.
func getRemoteSyncInterval(log daemonLogger) time.Duration {
// config.GetDuration handles both config.yaml and env var (env takes precedence)
duration := config.GetDuration("remote-sync-interval")
// If config returns 0, it could mean:
// 1. User explicitly set "0" to disable
// 2. Config not found (use default)
// Check if there's an explicit value set
if duration == 0 {
// Check if user explicitly set it to 0 via env var
if envVal := os.Getenv("BEADS_REMOTE_SYNC_INTERVAL"); envVal == "0" || envVal == "0s" {
log.log("Warning: remote-sync-interval is 0, periodic remote sync disabled")
return 24 * time.Hour * 365
}
// Otherwise use default
return DefaultRemoteSyncInterval
}
// Minimum 5 seconds to prevent excessive load
if duration > 0 && duration < 5*time.Second {
log.log("Warning: remote-sync-interval too low (%v), using minimum 5s", duration)
return 5 * time.Second
}
// Log if using non-default value
if duration != DefaultRemoteSyncInterval {
log.log("Using custom remote sync interval: %v", duration)
}
return duration
}