diff --git a/cmd/bd/daemon.go b/cmd/bd/daemon.go index bcbfe6f2..04647450 100644 --- a/cmd/bd/daemon.go +++ b/cmd/bd/daemon.go @@ -977,6 +977,7 @@ func setupDaemonLock(pidFile string, dbPath string, log daemonLogger) (io.Closer return lock, nil } +// startRPCServer initializes and starts the RPC server (temporary wrapper for old code) func startRPCServer(ctx context.Context, socketPath string, store storage.Storage, workspacePath string, dbPath string, log daemonLogger) (*rpc.Server, chan error, error) { // Sync daemon version with CLI version rpc.ServerVersion = Version @@ -1005,6 +1006,7 @@ func startRPCServer(ctx context.Context, socketPath string, store storage.Storag return server, serverErrChan, nil } +// runGlobalDaemon runs the global routing daemon (temporary wrapper for old code) func runGlobalDaemon(log daemonLogger) { globalDir, err := getGlobalBeadsDir() if err != nil { diff --git a/internal/daemonrunner/daemon.go b/internal/daemonrunner/daemon.go index 4a0973f8..8e56bfde 100644 --- a/internal/daemonrunner/daemon.go +++ b/internal/daemonrunner/daemon.go @@ -5,7 +5,9 @@ import ( "fmt" "io" "os" + "os/signal" "path/filepath" + "strconv" "strings" "time" @@ -128,20 +130,84 @@ func (d *Daemon) Start() error { return d.runSyncLoop(ctx, serverErrChan) } -// TODO: Implement these methods by extracting from cmd/bd/daemon.go func (d *Daemon) runGlobalDaemon() error { - // TODO: Extract from runGlobalDaemon in cmd/bd/daemon.go + globalDir, err := getGlobalBeadsDir() + if err != nil { + d.log.log("Error: cannot get global beads directory: %v", err) + return err + } + d.cfg.SocketPath = filepath.Join(globalDir, "bd.sock") + + ctx, cancel := context.WithCancel(context.Background()) + d.cancel = cancel + defer cancel() + + server, _, err := d.startRPCServer(ctx) + if err != nil { + return err + } + d.server = server + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, daemonSignals...) + defer signal.Stop(sigChan) + + sig := <-sigChan + d.log.log("Received signal: %v", sig) + d.log.log("Shutting down global daemon...") + + cancel() + if err := d.server.Stop(); err != nil { + d.log.log("Error stopping server: %v", err) + } + + d.log.log("Global daemon stopped") return nil } -func (d *Daemon) startRPCServer(ctx context.Context) (*rpc.Server, chan error, error) { - // TODO: Extract from startRPCServer in cmd/bd/daemon.go - return nil, nil, nil +func getGlobalBeadsDir() (string, error) { + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("cannot get home directory: %w", err) + } + + beadsDir := filepath.Join(home, ".beads") + if err := os.MkdirAll(beadsDir, 0700); err != nil { + return "", fmt.Errorf("cannot create global beads directory: %w", err) + } + + return beadsDir, nil } -func (d *Daemon) runSyncLoop(ctx context.Context, serverErrChan chan error) error { - // TODO: Extract from runDaemonLoop in cmd/bd/daemon.go - return nil + + +func (d *Daemon) setupLock() (io.Closer, error) { + beadsDir := filepath.Dir(d.cfg.PIDFile) + lock, err := acquireDaemonLock(beadsDir, d.cfg.DBPath, d.Version) + if err != nil { + if err == ErrDaemonLocked { + d.log.log("Daemon already running (lock held), exiting") + } else { + d.log.log("Error acquiring daemon lock: %v", err) + } + return nil, err + } + + myPID := os.Getpid() + // #nosec G304 - controlled path from config + if data, err := os.ReadFile(d.cfg.PIDFile); err == nil { + if pid, err := strconv.Atoi(strings.TrimSpace(string(data))); err == nil && pid == myPID { + // PID file is correct, continue + } else { + d.log.log("PID file has wrong PID (expected %d, got %d), overwriting", myPID, pid) + _ = os.WriteFile(d.cfg.PIDFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600) + } + } else { + d.log.log("PID file missing after lock acquisition, creating") + _ = os.WriteFile(d.cfg.PIDFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600) + } + + return lock, nil } // Stop gracefully shuts down the daemon diff --git a/internal/daemonrunner/logger.go b/internal/daemonrunner/logger.go index 9ce80f88..8489ed69 100644 --- a/internal/daemonrunner/logger.go +++ b/internal/daemonrunner/logger.go @@ -3,12 +3,12 @@ package daemonrunner import ( "fmt" "os" - "strconv" "time" "gopkg.in/natefinch/lumberjack.v2" ) +// logger provides simple logging with timestamp type logger struct { logFunc func(string, ...interface{}) } @@ -17,6 +17,7 @@ func (l *logger) log(format string, args ...interface{}) { l.logFunc(format, args...) } +// setupLogger configures the rotating log file func (d *Daemon) setupLogger() (*lumberjack.Logger, *logger) { maxSizeMB := getEnvInt("BEADS_DAEMON_LOG_MAX_SIZE", 10) maxBackups := getEnvInt("BEADS_DAEMON_LOG_MAX_BACKUPS", 3) @@ -42,15 +43,18 @@ func (d *Daemon) setupLogger() (*lumberjack.Logger, *logger) { return logF, log } +// getEnvInt reads an integer from environment variable with a default value func getEnvInt(key string, defaultValue int) int { if val := os.Getenv(key); val != "" { - if parsed, err := strconv.Atoi(val); err == nil { + var parsed int + if _, err := fmt.Sscanf(val, "%d", &parsed); err == nil { return parsed } } return defaultValue } +// getEnvBool reads a boolean from environment variable with a default value func getEnvBool(key string, defaultValue bool) bool { if val := os.Getenv(key); val != "" { return val == "true" || val == "1" diff --git a/internal/daemonrunner/process.go b/internal/daemonrunner/process.go index 2aef591e..16bb0bf4 100644 --- a/internal/daemonrunner/process.go +++ b/internal/daemonrunner/process.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "os" "path/filepath" "time" @@ -36,35 +35,6 @@ func (l *DaemonLock) Close() error { return err } -func (d *Daemon) setupLock() (io.Closer, error) { - lock, err := acquireDaemonLock(d.cfg.BeadsDir, d.cfg.DBPath, d.Version) - if err != nil { - if err == ErrDaemonLocked { - d.log.log("Daemon already running (lock held), exiting") - } else { - d.log.log("Error acquiring daemon lock: %v", err) - } - return nil, err - } - - // Ensure PID file matches our PID - myPID := os.Getpid() - pidFile := d.cfg.PIDFile - // #nosec G304 - controlled path from config - if data, err := os.ReadFile(pidFile); err == nil { - var filePID int - if _, err := fmt.Sscanf(string(data), "%d", &filePID); err == nil && filePID != myPID { - d.log.log("PID file has wrong PID (expected %d, got %d), overwriting", myPID, filePID) - _ = os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600) - } - } else { - d.log.log("PID file missing after lock acquisition, creating") - _ = os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600) - } - - return lock, nil -} - // acquireDaemonLock attempts to acquire an exclusive lock on daemon.lock func acquireDaemonLock(beadsDir string, dbPath string, version string) (*DaemonLock, error) { lockPath := filepath.Join(beadsDir, "daemon.lock") diff --git a/internal/daemonrunner/rpc.go b/internal/daemonrunner/rpc.go new file mode 100644 index 00000000..a64f1f3d --- /dev/null +++ b/internal/daemonrunner/rpc.go @@ -0,0 +1,37 @@ +package daemonrunner + +import ( + "context" + "time" + + "github.com/steveyegge/beads/internal/rpc" +) + +// startRPCServer initializes and starts the RPC server +func (d *Daemon) startRPCServer(ctx context.Context) (*rpc.Server, chan error, error) { + // Sync daemon version with CLI version + rpc.ServerVersion = d.Version + + server := rpc.NewServer(d.cfg.SocketPath, d.store, d.cfg.WorkspacePath, d.cfg.DBPath) + serverErrChan := make(chan error, 1) + + go func() { + d.log.log("Starting RPC server: %s", d.cfg.SocketPath) + if err := server.Start(ctx); err != nil { + d.log.log("RPC server error: %v", err) + serverErrChan <- err + } + }() + + select { + case err := <-serverErrChan: + d.log.log("RPC server failed to start: %v", err) + return nil, nil, err + case <-server.WaitReady(): + d.log.log("RPC server ready (socket listening)") + case <-time.After(5 * time.Second): + d.log.log("WARNING: Server didn't signal ready after 5 seconds (may still be starting)") + } + + return server, serverErrChan, nil +} diff --git a/internal/daemonrunner/signals_unix.go b/internal/daemonrunner/signals_unix.go new file mode 100644 index 00000000..8c03bd36 --- /dev/null +++ b/internal/daemonrunner/signals_unix.go @@ -0,0 +1,14 @@ +//go:build unix || linux || darwin + +package daemonrunner + +import ( + "os" + "syscall" +) + +var daemonSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP} + +func isReloadSignal(sig os.Signal) bool { + return sig == syscall.SIGHUP +} diff --git a/internal/daemonrunner/signals_windows.go b/internal/daemonrunner/signals_windows.go new file mode 100644 index 00000000..2e0713e2 --- /dev/null +++ b/internal/daemonrunner/signals_windows.go @@ -0,0 +1,14 @@ +//go:build windows + +package daemonrunner + +import ( + "os" + "syscall" +) + +var daemonSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} + +func isReloadSignal(os.Signal) bool { + return false +} diff --git a/internal/daemonrunner/sync.go b/internal/daemonrunner/sync.go new file mode 100644 index 00000000..ddbdf114 --- /dev/null +++ b/internal/daemonrunner/sync.go @@ -0,0 +1,243 @@ +package daemonrunner + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "os" + "os/signal" + "path/filepath" + "sort" + "time" + + "github.com/steveyegge/beads/internal/types" +) + +// runSyncLoop manages the main daemon event loop for sync operations +func (d *Daemon) runSyncLoop(ctx context.Context, serverErrChan chan error) error { + beadsDir := d.cfg.BeadsDir + jsonlPath := filepath.Join(filepath.Dir(beadsDir), "issues.jsonl") + + ticker := time.NewTicker(d.cfg.Interval) + defer ticker.Stop() + + doSync := func() { + syncCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + if err := d.exportToJSONL(syncCtx, jsonlPath); err != nil { + d.log.log("Export failed: %v", err) + return + } + d.log.log("Exported to JSONL") + + if d.cfg.AutoCommit { + hasChanges, err := gitHasChanges(syncCtx, jsonlPath) + if err != nil { + d.log.log("Error checking git status: %v", err) + return + } + + if hasChanges { + message := "bd daemon sync: " + time.Now().Format("2006-01-02 15:04:05") + if err := gitCommit(syncCtx, jsonlPath, message); err != nil { + d.log.log("Commit failed: %v", err) + return + } + d.log.log("Committed changes") + } + } + + if err := gitPull(syncCtx); err != nil { + d.log.log("Pull failed: %v", err) + return + } + d.log.log("Pulled from remote") + + beforeCount, err := d.countDBIssues(syncCtx) + if err != nil { + d.log.log("Failed to count issues before import: %v", err) + return + } + + if err := d.importFromJSONL(syncCtx, jsonlPath); err != nil { + d.log.log("Import failed: %v", err) + return + } + d.log.log("Imported from JSONL") + + afterCount, err := d.countDBIssues(syncCtx) + if err != nil { + d.log.log("Failed to count issues after import: %v", err) + return + } + + if err := d.validatePostImport(beforeCount, afterCount); err != nil { + d.log.log("Post-import validation failed: %v", err) + return + } + + if d.cfg.AutoPush && d.cfg.AutoCommit { + if err := gitPush(syncCtx); err != nil { + d.log.log("Push failed: %v", err) + return + } + d.log.log("Pushed to remote") + } + + d.log.log("Sync cycle complete") + } + + return d.runEventLoop(ctx, ticker, doSync, serverErrChan) +} + +// runEventLoop handles signals and periodic sync +func (d *Daemon) runEventLoop(ctx context.Context, ticker *time.Ticker, doSync func(), serverErrChan chan error) error { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, daemonSignals...) + defer signal.Stop(sigChan) + + for { + select { + case <-ticker.C: + if ctx.Err() != nil { + return nil + } + doSync() + case sig := <-sigChan: + if isReloadSignal(sig) { + d.log.log("Received reload signal, ignoring (daemon continues running)") + continue + } + d.log.log("Received signal %v, shutting down gracefully...", sig) + d.cancel() + if err := d.server.Stop(); err != nil { + d.log.log("Error stopping RPC server: %v", err) + } + return nil + case <-ctx.Done(): + d.log.log("Context canceled, shutting down") + if err := d.server.Stop(); err != nil { + d.log.log("Error stopping RPC server: %v", err) + } + return nil + case err := <-serverErrChan: + d.log.log("RPC server failed: %v", err) + d.cancel() + if err := d.server.Stop(); err != nil { + d.log.log("Error stopping RPC server: %v", err) + } + return err + } + } +} + +// exportToJSONL exports all issues to JSONL format +func (d *Daemon) exportToJSONL(ctx context.Context, jsonlPath string) error { + // Get all issues + issues, err := d.store.SearchIssues(ctx, "", types.IssueFilter{}) + if err != nil { + return fmt.Errorf("failed to get issues: %w", err) + } + + // Sort by ID for consistent output + sort.Slice(issues, func(i, j int) bool { + return issues[i].ID < issues[j].ID + }) + + // Populate dependencies for all issues + allDeps, err := d.store.GetAllDependencyRecords(ctx) + if err != nil { + return fmt.Errorf("failed to get dependencies: %w", err) + } + for _, issue := range issues { + issue.Dependencies = allDeps[issue.ID] + } + + // Populate labels for all issues + for _, issue := range issues { + labels, err := d.store.GetLabels(ctx, issue.ID) + if err != nil { + return fmt.Errorf("failed to get labels for %s: %w", issue.ID, err) + } + issue.Labels = labels + } + + // Populate comments for all issues + for _, issue := range issues { + comments, err := d.store.GetIssueComments(ctx, issue.ID) + if err != nil { + return fmt.Errorf("failed to get comments for %s: %w", issue.ID, err) + } + issue.Comments = comments + } + + // Write to temp file then rename for atomicity + tempFile := jsonlPath + ".tmp" + f, err := os.Create(tempFile) + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + defer f.Close() + + encoder := json.NewEncoder(f) + for _, issue := range issues { + if err := encoder.Encode(issue); err != nil { + return fmt.Errorf("failed to encode issue: %w", err) + } + } + + if err := f.Close(); err != nil { + return fmt.Errorf("failed to close temp file: %w", err) + } + + if err := os.Rename(tempFile, jsonlPath); err != nil { + return fmt.Errorf("failed to rename temp file: %w", err) + } + + return nil +} + +// importFromJSONL imports issues from JSONL format +// Note: This is a simplified implementation for the daemon +// The full implementation with conflict resolution is in cmd/bd/import.go +func (d *Daemon) importFromJSONL(ctx context.Context, jsonlPath string) error { + // For now, this is a placeholder that will be filled in later + // The daemon will use the same import logic as the CLI + // TODO: Extract import logic from cmd/bd/import.go to a shared package + return fmt.Errorf("importFromJSONL not yet implemented in daemon runner") +} + +// countDBIssues returns the count of issues in the database +func (d *Daemon) countDBIssues(ctx context.Context) (int, error) { + // Try fast path with COUNT(*) + type dbGetter interface { + GetDB() interface{} + } + + if getter, ok := d.store.(dbGetter); ok { + if db, ok := getter.GetDB().(*sql.DB); ok && db != nil { + var count int + err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM issues").Scan(&count) + if err == nil { + return count, nil + } + } + } + + // Fallback: load all issues and count them + issues, err := d.store.SearchIssues(ctx, "", types.IssueFilter{}) + if err != nil { + return 0, fmt.Errorf("failed to count database issues: %w", err) + } + return len(issues), nil +} + +// validatePostImport validates that the import didn't cause data loss +func (d *Daemon) validatePostImport(before, after int) error { + if after < before { + return fmt.Errorf("import reduced issue count: %d → %d (data loss detected!)", before, after) + } + return nil +}