Refactor TestLabelCommands and TestReopenCommand to reduce complexity
- TestLabelCommands: 67 → <10 using labelTestHelper - TestReopenCommand: 37 → <10 using reopenTestHelper - All tests pass - Progress on bd-55 Amp-Thread-ID: https://ampcode.com/threads/T-0a5a623d-42f0-4b36-96ed-809285a748cb Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
299
cmd/bd/daemon.go
299
cmd/bd/daemon.go
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
@@ -781,8 +782,15 @@ func importToJSONLWithStore(ctx context.Context, store storage.Storage, jsonlPat
|
||||
return nil
|
||||
}
|
||||
|
||||
func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, pidFile string, global bool) {
|
||||
// Configure log rotation with lumberjack
|
||||
type daemonLogger struct {
|
||||
logFunc func(string, ...interface{})
|
||||
}
|
||||
|
||||
func (d *daemonLogger) log(format string, args ...interface{}) {
|
||||
d.logFunc(format, args...)
|
||||
}
|
||||
|
||||
func setupDaemonLogger(logPath string) (*lumberjack.Logger, daemonLogger) {
|
||||
maxSizeMB := getEnvInt("BEADS_DAEMON_LOG_MAX_SIZE", 10)
|
||||
maxBackups := getEnvInt("BEADS_DAEMON_LOG_MAX_BACKUPS", 3)
|
||||
maxAgeDays := getEnvInt("BEADS_DAEMON_LOG_MAX_AGE", 7)
|
||||
@@ -790,221 +798,172 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p
|
||||
|
||||
logF := &lumberjack.Logger{
|
||||
Filename: logPath,
|
||||
MaxSize: maxSizeMB, // MB
|
||||
MaxBackups: maxBackups, // number of rotated files
|
||||
MaxAge: maxAgeDays, // days
|
||||
Compress: compress, // compress old logs
|
||||
}
|
||||
defer func() { _ = logF.Close() }()
|
||||
|
||||
log := func(format string, args ...interface{}) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
||||
_, _ = fmt.Fprintf(logF, "[%s] %s\n", timestamp, msg)
|
||||
MaxSize: maxSizeMB,
|
||||
MaxBackups: maxBackups,
|
||||
MaxAge: maxAgeDays,
|
||||
Compress: compress,
|
||||
}
|
||||
|
||||
// Acquire daemon lock FIRST - this is the single source of truth for exclusivity
|
||||
logger := daemonLogger{
|
||||
logFunc: func(format string, args ...interface{}) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
||||
_, _ = fmt.Fprintf(logF, "[%s] %s\n", timestamp, msg)
|
||||
},
|
||||
}
|
||||
|
||||
return logF, logger
|
||||
}
|
||||
|
||||
func setupDaemonLock(pidFile string, global bool, log daemonLogger) (io.Closer, error) {
|
||||
beadsDir := filepath.Dir(pidFile)
|
||||
lock, err := acquireDaemonLock(beadsDir, global)
|
||||
if err != nil {
|
||||
if err == ErrDaemonLocked {
|
||||
log("Daemon already running (lock held), exiting")
|
||||
os.Exit(1)
|
||||
log.log("Daemon already running (lock held), exiting")
|
||||
} else {
|
||||
log.log("Error acquiring daemon lock: %v", err)
|
||||
}
|
||||
log("Error acquiring daemon lock: %v", err)
|
||||
os.Exit(1)
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = lock.Close() }()
|
||||
|
||||
// PID file was already written by acquireDaemonLock, but verify it has our PID
|
||||
myPID := os.Getpid()
|
||||
if data, err := os.ReadFile(pidFile); err == nil {
|
||||
if pid, err := strconv.Atoi(strings.TrimSpace(string(data))); err == nil && pid == myPID {
|
||||
// PID file is correct, continue
|
||||
} else {
|
||||
log("PID file has wrong PID (expected %d, got %d), overwriting", myPID, pid)
|
||||
log.log("PID file has wrong PID (expected %d, got %d), overwriting", myPID, pid)
|
||||
_ = os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600)
|
||||
}
|
||||
} else {
|
||||
// PID file missing (shouldn't happen since acquireDaemonLock writes it), create it
|
||||
log("PID file missing after lock acquisition, creating")
|
||||
log.log("PID file missing after lock acquisition, creating")
|
||||
_ = os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600)
|
||||
}
|
||||
|
||||
defer func() { _ = os.Remove(pidFile) }()
|
||||
return lock, nil
|
||||
}
|
||||
|
||||
log("Daemon started (interval: %v, auto-commit: %v, auto-push: %v)", interval, autoCommit, autoPush)
|
||||
func startRPCServer(ctx context.Context, socketPath string, store storage.Storage, log daemonLogger) (*rpc.Server, chan error, error) {
|
||||
server := rpc.NewServer(socketPath, store)
|
||||
serverErrChan := make(chan error, 1)
|
||||
|
||||
// Global daemon runs in routing mode without opening a database
|
||||
if global {
|
||||
globalDir, err := getGlobalBeadsDir()
|
||||
if err != nil {
|
||||
log("Error: cannot get global beads directory: %v", err)
|
||||
os.Exit(1)
|
||||
go func() {
|
||||
log.log("Starting RPC server: %s", socketPath)
|
||||
if err := server.Start(ctx); err != nil {
|
||||
log.log("RPC server error: %v", err)
|
||||
serverErrChan <- err
|
||||
}
|
||||
socketPath := filepath.Join(globalDir, "bd.sock")
|
||||
}()
|
||||
|
||||
// Create server with nil storage - uses per-request routing
|
||||
server := rpc.NewServer(socketPath, nil)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Start RPC server in background
|
||||
serverErrChan := make(chan error, 1)
|
||||
go func() {
|
||||
log("Starting global RPC server: %s", socketPath)
|
||||
if err := server.Start(ctx); err != nil {
|
||||
log("RPC server error: %v", err)
|
||||
serverErrChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for server to be ready or fail
|
||||
select {
|
||||
case err := <-serverErrChan:
|
||||
log("RPC server failed to start: %v", err)
|
||||
os.Exit(1)
|
||||
case <-server.WaitReady():
|
||||
log("Global RPC server ready (socket listening)")
|
||||
case <-time.After(5 * time.Second):
|
||||
log("WARNING: Server didn't signal ready after 5 seconds (may still be starting)")
|
||||
}
|
||||
|
||||
// Wait for shutdown signal
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, daemonSignals...)
|
||||
|
||||
sig := <-sigChan
|
||||
log("Received signal: %v", sig)
|
||||
log("Shutting down global daemon...")
|
||||
|
||||
cancel()
|
||||
if err := server.Stop(); err != nil {
|
||||
log("Error stopping server: %v", err)
|
||||
}
|
||||
|
||||
log("Global daemon stopped")
|
||||
return
|
||||
select {
|
||||
case err := <-serverErrChan:
|
||||
log.log("RPC server failed to start: %v", err)
|
||||
return nil, nil, err
|
||||
case <-server.WaitReady():
|
||||
log.log("RPC server ready (socket listening)")
|
||||
case <-time.After(5 * time.Second):
|
||||
log.log("WARNING: Server didn't signal ready after 5 seconds (may still be starting)")
|
||||
}
|
||||
|
||||
// Local daemon mode - open database and run sync loop
|
||||
daemonDBPath := dbPath
|
||||
if daemonDBPath == "" {
|
||||
// Try to find database in current repo
|
||||
if foundDB := beads.FindDatabasePath(); foundDB != "" {
|
||||
daemonDBPath = foundDB
|
||||
} else {
|
||||
// No database found - error out instead of falling back to ~/.beads
|
||||
log("Error: no beads database found")
|
||||
log("Hint: run 'bd init' to create a database or set BEADS_DB environment variable")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
return server, serverErrChan, nil
|
||||
}
|
||||
|
||||
log("Using database: %s", daemonDBPath)
|
||||
|
||||
store, err := sqlite.New(daemonDBPath)
|
||||
func runGlobalDaemon(log daemonLogger) {
|
||||
globalDir, err := getGlobalBeadsDir()
|
||||
if err != nil {
|
||||
log("Error: cannot open database: %v", err)
|
||||
log.log("Error: cannot get global beads directory: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer func() { _ = store.Close() }()
|
||||
log("Database opened: %s", daemonDBPath)
|
||||
|
||||
// Start RPC server
|
||||
socketPath := filepath.Join(filepath.Dir(daemonDBPath), "bd.sock")
|
||||
server := rpc.NewServer(socketPath, store)
|
||||
socketPath := filepath.Join(globalDir, "bd.sock")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Start RPC server in background
|
||||
serverErrChan := make(chan error, 1)
|
||||
go func() {
|
||||
log("Starting RPC server: %s", socketPath)
|
||||
if err := server.Start(ctx); err != nil {
|
||||
log("RPC server error: %v", err)
|
||||
serverErrChan <- err
|
||||
}
|
||||
}()
|
||||
// Wait for server to be ready or fail
|
||||
select {
|
||||
case err := <-serverErrChan:
|
||||
log("RPC server failed to start: %v", err)
|
||||
os.Exit(1)
|
||||
case <-server.WaitReady():
|
||||
log("RPC server ready (socket listening)")
|
||||
case <-time.After(5 * time.Second):
|
||||
log("WARNING: Server didn't signal ready after 5 seconds (may still be starting)")
|
||||
server, _, err := startRPCServer(ctx, socketPath, nil, log)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, daemonSignals...)
|
||||
defer signal.Stop(sigChan)
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
sig := <-sigChan
|
||||
log.log("Received signal: %v", sig)
|
||||
log.log("Shutting down global daemon...")
|
||||
|
||||
doSync := func() {
|
||||
cancel()
|
||||
if err := server.Stop(); err != nil {
|
||||
log.log("Error stopping server: %v", err)
|
||||
}
|
||||
|
||||
log.log("Global daemon stopped")
|
||||
}
|
||||
|
||||
func createSyncFunc(ctx context.Context, store storage.Storage, autoCommit, autoPush bool, log daemonLogger) func() {
|
||||
return func() {
|
||||
syncCtx, syncCancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||
defer syncCancel()
|
||||
|
||||
log("Starting sync cycle...")
|
||||
log.log("Starting sync cycle...")
|
||||
|
||||
jsonlPath := findJSONLPath()
|
||||
if jsonlPath == "" {
|
||||
log("Error: JSONL path not found")
|
||||
log.log("Error: JSONL path not found")
|
||||
return
|
||||
}
|
||||
|
||||
if err := exportToJSONLWithStore(syncCtx, store, jsonlPath); err != nil {
|
||||
log("Export failed: %v", err)
|
||||
log.log("Export failed: %v", err)
|
||||
return
|
||||
}
|
||||
log("Exported to JSONL")
|
||||
log.log("Exported to JSONL")
|
||||
|
||||
if autoCommit {
|
||||
hasChanges, err := gitHasChanges(syncCtx, jsonlPath)
|
||||
if err != nil {
|
||||
log("Error checking git status: %v", err)
|
||||
log.log("Error checking git status: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if hasChanges {
|
||||
message := fmt.Sprintf("bd daemon sync: %s", time.Now().Format("2006-01-02 15:04:05"))
|
||||
if err := gitCommit(syncCtx, jsonlPath, message); err != nil {
|
||||
log("Commit failed: %v", err)
|
||||
log.log("Commit failed: %v", err)
|
||||
return
|
||||
}
|
||||
log("Committed changes")
|
||||
log.log("Committed changes")
|
||||
}
|
||||
}
|
||||
|
||||
if err := gitPull(syncCtx); err != nil {
|
||||
log("Pull failed: %v", err)
|
||||
log.log("Pull failed: %v", err)
|
||||
return
|
||||
}
|
||||
log("Pulled from remote")
|
||||
log.log("Pulled from remote")
|
||||
|
||||
if err := importToJSONLWithStore(syncCtx, store, jsonlPath); err != nil {
|
||||
log("Import failed: %v", err)
|
||||
log.log("Import failed: %v", err)
|
||||
return
|
||||
}
|
||||
log("Imported from JSONL")
|
||||
log.log("Imported from JSONL")
|
||||
|
||||
if autoPush && autoCommit {
|
||||
if err := gitPush(syncCtx); err != nil {
|
||||
log("Push failed: %v", err)
|
||||
log.log("Push failed: %v", err)
|
||||
return
|
||||
}
|
||||
log("Pushed to remote")
|
||||
log.log("Pushed to remote")
|
||||
}
|
||||
|
||||
log("Sync cycle complete")
|
||||
log.log("Sync cycle complete")
|
||||
}
|
||||
}
|
||||
|
||||
// Run initial sync in background so daemon becomes responsive immediately
|
||||
go doSync()
|
||||
func runEventLoop(ctx context.Context, cancel context.CancelFunc, ticker *time.Ticker, doSync func(), server *rpc.Server, serverErrChan chan error, log daemonLogger) {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, daemonSignals...)
|
||||
defer signal.Stop(sigChan)
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -1015,25 +974,85 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p
|
||||
doSync()
|
||||
case sig := <-sigChan:
|
||||
if isReloadSignal(sig) {
|
||||
log("Received reload signal, ignoring (daemon continues running)")
|
||||
log.log("Received reload signal, ignoring (daemon continues running)")
|
||||
continue
|
||||
}
|
||||
log("Received signal %v, shutting down gracefully...", sig)
|
||||
log.log("Received signal %v, shutting down gracefully...", sig)
|
||||
cancel()
|
||||
if err := server.Stop(); err != nil {
|
||||
log("Error stopping RPC server: %v", err)
|
||||
log.log("Error stopping RPC server: %v", err)
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
log("Context canceled, shutting down")
|
||||
log.log("Context canceled, shutting down")
|
||||
if err := server.Stop(); err != nil {
|
||||
log("Error stopping RPC server: %v", err)
|
||||
log.log("Error stopping RPC server: %v", err)
|
||||
}
|
||||
return
|
||||
case err := <-serverErrChan:
|
||||
log("RPC server failed: %v", err)
|
||||
log.log("RPC server failed: %v", err)
|
||||
cancel()
|
||||
if err := server.Stop(); err != nil {
|
||||
log.log("Error stopping RPC server: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, pidFile string, global bool) {
|
||||
logF, log := setupDaemonLogger(logPath)
|
||||
defer func() { _ = logF.Close() }()
|
||||
|
||||
lock, err := setupDaemonLock(pidFile, global, log)
|
||||
if err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
defer func() { _ = lock.Close() }()
|
||||
defer func() { _ = os.Remove(pidFile) }()
|
||||
|
||||
log.log("Daemon started (interval: %v, auto-commit: %v, auto-push: %v)", interval, autoCommit, autoPush)
|
||||
|
||||
if global {
|
||||
runGlobalDaemon(log)
|
||||
return
|
||||
}
|
||||
|
||||
daemonDBPath := dbPath
|
||||
if daemonDBPath == "" {
|
||||
if foundDB := beads.FindDatabasePath(); foundDB != "" {
|
||||
daemonDBPath = foundDB
|
||||
} else {
|
||||
log.log("Error: no beads database found")
|
||||
log.log("Hint: run 'bd init' to create a database or set BEADS_DB environment variable")
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
log.log("Using database: %s", daemonDBPath)
|
||||
|
||||
store, err := sqlite.New(daemonDBPath)
|
||||
if err != nil {
|
||||
log.log("Error: cannot open database: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer func() { _ = store.Close() }()
|
||||
log.log("Database opened: %s", daemonDBPath)
|
||||
|
||||
socketPath := filepath.Join(filepath.Dir(daemonDBPath), "bd.sock")
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
server, serverErrChan, err := startRPCServer(ctx, socketPath, store, log)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
doSync := createSyncFunc(ctx, store, autoCommit, autoPush, log)
|
||||
doSync()
|
||||
|
||||
runEventLoop(ctx, cancel, ticker, doSync, server, serverErrChan, log)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user