Fix daemon orphaning: track parent PID and exit when parent dies
- Add ParentPID field to DaemonLockInfo struct - Daemon monitors parent process every 10 seconds - Gracefully exits when parent process dies - Prevents accumulation of orphaned daemons from dead sessions - Fixes race conditions from multiple daemons on same database Closes bd-zpnq
This commit is contained in:
@@ -372,6 +372,10 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p
|
|||||||
doSync := createSyncFunc(ctx, store, autoCommit, autoPush, log)
|
doSync := createSyncFunc(ctx, store, autoCommit, autoPush, log)
|
||||||
doSync()
|
doSync()
|
||||||
|
|
||||||
|
// Get parent PID for monitoring (exit if parent dies)
|
||||||
|
parentPID := os.Getppid()
|
||||||
|
log.log("Monitoring parent process (PID %d)", parentPID)
|
||||||
|
|
||||||
// Choose event loop based on BEADS_DAEMON_MODE
|
// Choose event loop based on BEADS_DAEMON_MODE
|
||||||
daemonMode := os.Getenv("BEADS_DAEMON_MODE")
|
daemonMode := os.Getenv("BEADS_DAEMON_MODE")
|
||||||
if daemonMode == "" {
|
if daemonMode == "" {
|
||||||
@@ -385,18 +389,18 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p
|
|||||||
if jsonlPath == "" {
|
if jsonlPath == "" {
|
||||||
log.log("Error: JSONL path not found, cannot use event-driven mode")
|
log.log("Error: JSONL path not found, cannot use event-driven mode")
|
||||||
log.log("Falling back to polling mode")
|
log.log("Falling back to polling mode")
|
||||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, parentPID, log)
|
||||||
} else {
|
} else {
|
||||||
// Event-driven mode uses separate export-only and import-only functions
|
// Event-driven mode uses separate export-only and import-only functions
|
||||||
doExport := createExportFunc(ctx, store, autoCommit, autoPush, log)
|
doExport := createExportFunc(ctx, store, autoCommit, autoPush, log)
|
||||||
doAutoImport := createAutoImportFunc(ctx, store, log)
|
doAutoImport := createAutoImportFunc(ctx, store, log)
|
||||||
runEventDrivenLoop(ctx, cancel, server, serverErrChan, store, jsonlPath, doExport, doAutoImport, log)
|
runEventDrivenLoop(ctx, cancel, server, serverErrChan, store, jsonlPath, doExport, doAutoImport, parentPID, log)
|
||||||
}
|
}
|
||||||
case "poll":
|
case "poll":
|
||||||
log.log("Using polling mode (interval: %v)", interval)
|
log.log("Using polling mode (interval: %v)", interval)
|
||||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, parentPID, log)
|
||||||
default:
|
default:
|
||||||
log.log("Unknown BEADS_DAEMON_MODE: %s (valid: poll, events), defaulting to poll", daemonMode)
|
log.log("Unknown BEADS_DAEMON_MODE: %s (valid: poll, events), defaulting to poll", daemonMode)
|
||||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, parentPID, log)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
// - File system changes (JSONL modifications)
|
// - File system changes (JSONL modifications)
|
||||||
// - RPC mutations (create, update, delete)
|
// - RPC mutations (create, update, delete)
|
||||||
// - Git operations (via hooks, optional)
|
// - Git operations (via hooks, optional)
|
||||||
|
// - Parent process monitoring (exit if parent dies)
|
||||||
func runEventDrivenLoop(
|
func runEventDrivenLoop(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
cancel context.CancelFunc,
|
cancel context.CancelFunc,
|
||||||
@@ -24,6 +25,7 @@ func runEventDrivenLoop(
|
|||||||
jsonlPath string,
|
jsonlPath string,
|
||||||
doExport func(),
|
doExport func(),
|
||||||
doAutoImport func(),
|
doAutoImport func(),
|
||||||
|
parentPID int,
|
||||||
log daemonLogger,
|
log daemonLogger,
|
||||||
) {
|
) {
|
||||||
sigChan := make(chan os.Signal, 1)
|
sigChan := make(chan os.Signal, 1)
|
||||||
@@ -83,6 +85,10 @@ func runEventDrivenLoop(
|
|||||||
healthTicker := time.NewTicker(60 * time.Second)
|
healthTicker := time.NewTicker(60 * time.Second)
|
||||||
defer healthTicker.Stop()
|
defer healthTicker.Stop()
|
||||||
|
|
||||||
|
// Parent process check (every 10 seconds)
|
||||||
|
parentCheckTicker := time.NewTicker(10 * time.Second)
|
||||||
|
defer parentCheckTicker.Stop()
|
||||||
|
|
||||||
// Dropped events safety net (faster recovery than health check)
|
// Dropped events safety net (faster recovery than health check)
|
||||||
droppedEventsTicker := time.NewTicker(1 * time.Second)
|
droppedEventsTicker := time.NewTicker(1 * time.Second)
|
||||||
defer droppedEventsTicker.Stop()
|
defer droppedEventsTicker.Stop()
|
||||||
@@ -101,6 +107,17 @@ func runEventDrivenLoop(
|
|||||||
// Periodic health validation (not sync)
|
// Periodic health validation (not sync)
|
||||||
checkDaemonHealth(ctx, store, log)
|
checkDaemonHealth(ctx, store, log)
|
||||||
|
|
||||||
|
case <-parentCheckTicker.C:
|
||||||
|
// Check if parent process is still alive
|
||||||
|
if !checkParentProcessAlive(parentPID) {
|
||||||
|
log.log("Parent process (PID %d) died, shutting down daemon", parentPID)
|
||||||
|
cancel()
|
||||||
|
if err := server.Stop(); err != nil {
|
||||||
|
log.log("Error stopping server: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
case <-func() <-chan time.Time {
|
case <-func() <-chan time.Time {
|
||||||
if fallbackTicker != nil {
|
if fallbackTicker != nil {
|
||||||
return fallbackTicker.C
|
return fallbackTicker.C
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ var ErrDaemonLocked = errors.New("daemon lock already held by another process")
|
|||||||
// DaemonLockInfo represents the metadata stored in the daemon.lock file
|
// DaemonLockInfo represents the metadata stored in the daemon.lock file
|
||||||
type DaemonLockInfo struct {
|
type DaemonLockInfo struct {
|
||||||
PID int `json:"pid"`
|
PID int `json:"pid"`
|
||||||
|
ParentPID int `json:"parent_pid,omitempty"` // Parent process ID (0 if not tracked)
|
||||||
Database string `json:"database"`
|
Database string `json:"database"`
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
StartedAt time.Time `json:"started_at"`
|
StartedAt time.Time `json:"started_at"`
|
||||||
@@ -63,6 +64,7 @@ func acquireDaemonLock(beadsDir string, dbPath string) (*DaemonLock, error) {
|
|||||||
// Write JSON metadata to the lock file
|
// Write JSON metadata to the lock file
|
||||||
lockInfo := DaemonLockInfo{
|
lockInfo := DaemonLockInfo{
|
||||||
PID: os.Getpid(),
|
PID: os.Getpid(),
|
||||||
|
ParentPID: os.Getppid(),
|
||||||
Database: dbPath,
|
Database: dbPath,
|
||||||
Version: Version,
|
Version: Version,
|
||||||
StartedAt: time.Now().UTC(),
|
StartedAt: time.Now().UTC(),
|
||||||
|
|||||||
45
cmd/bd/daemon_parent_test.go
Normal file
45
cmd/bd/daemon_parent_test.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestDaemonExitsWhenParentDies verifies that the daemon exits when its parent process dies
|
||||||
|
func TestDaemonExitsWhenParentDies(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Skip("Manual test - requires daemon to be run externally")
|
||||||
|
|
||||||
|
// This is a manual test scenario:
|
||||||
|
// 1. Start a shell process that spawns the daemon
|
||||||
|
// 2. Verify daemon tracks parent PID
|
||||||
|
// 3. Kill the shell process
|
||||||
|
// 4. Verify daemon exits within 10-15 seconds
|
||||||
|
//
|
||||||
|
// To test manually:
|
||||||
|
// $ sh -c 'bd daemon --interval 5s & sleep 100' &
|
||||||
|
// $ SHELL_PID=$!
|
||||||
|
// $ # Check daemon.lock has parent_pid set to SHELL_PID
|
||||||
|
// $ kill $SHELL_PID
|
||||||
|
// $ # Daemon should exit within 10-15 seconds
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustAbs(t *testing.T, path string) string {
|
||||||
|
abs, err := filepath.Abs(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to get absolute path: %v", err)
|
||||||
|
}
|
||||||
|
return abs
|
||||||
|
}
|
||||||
|
|
||||||
|
func runGitCmd(t *testing.T, dir string, args ...string) {
|
||||||
|
cmd := exec.Command("git", args...)
|
||||||
|
cmd.Dir = dir
|
||||||
|
if err := cmd.Run(); err != nil {
|
||||||
|
t.Fatalf("git %v failed: %v", args, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -73,12 +73,34 @@ func runGlobalDaemon(log daemonLogger) {
|
|||||||
log.log("Global daemon stopped")
|
log.log("Global daemon stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkParentProcessAlive checks if the parent process is still running.
|
||||||
|
// Returns true if parent is alive, false if it died.
|
||||||
|
// Returns true if parent PID is 0 or 1 (not tracked, or adopted by init).
|
||||||
|
func checkParentProcessAlive(parentPID int) bool {
|
||||||
|
if parentPID == 0 {
|
||||||
|
// Parent PID not tracked (older lock files)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if parentPID == 1 {
|
||||||
|
// Adopted by init - parent died
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if parent process is running
|
||||||
|
return isProcessRunning(parentPID)
|
||||||
|
}
|
||||||
|
|
||||||
// runEventLoop runs the daemon event loop (polling mode)
|
// runEventLoop runs the daemon event loop (polling mode)
|
||||||
func runEventLoop(ctx context.Context, cancel context.CancelFunc, ticker *time.Ticker, doSync func(), server *rpc.Server, serverErrChan chan error, log daemonLogger) {
|
func runEventLoop(ctx context.Context, cancel context.CancelFunc, ticker *time.Ticker, doSync func(), server *rpc.Server, serverErrChan chan error, parentPID int, log daemonLogger) {
|
||||||
sigChan := make(chan os.Signal, 1)
|
sigChan := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigChan, daemonSignals...)
|
signal.Notify(sigChan, daemonSignals...)
|
||||||
defer signal.Stop(sigChan)
|
defer signal.Stop(sigChan)
|
||||||
|
|
||||||
|
// Parent process check (every 10 seconds)
|
||||||
|
parentCheckTicker := time.NewTicker(10 * time.Second)
|
||||||
|
defer parentCheckTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
@@ -86,6 +108,16 @@ func runEventLoop(ctx context.Context, cancel context.CancelFunc, ticker *time.T
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
doSync()
|
doSync()
|
||||||
|
case <-parentCheckTicker.C:
|
||||||
|
// Check if parent process is still alive
|
||||||
|
if !checkParentProcessAlive(parentPID) {
|
||||||
|
log.log("Parent process (PID %d) died, shutting down daemon", parentPID)
|
||||||
|
cancel()
|
||||||
|
if err := server.Stop(); err != nil {
|
||||||
|
log.log("Error stopping server: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
case sig := <-sigChan:
|
case sig := <-sigChan:
|
||||||
if isReloadSignal(sig) {
|
if isReloadSignal(sig) {
|
||||||
log.log("Received reload signal, ignoring (daemon continues running)")
|
log.log("Received reload signal, ignoring (daemon continues running)")
|
||||||
|
|||||||
Reference in New Issue
Block a user