Refactor daemon.go for testability and maintainability (bd-2b34)

- Split 1567-line daemon.go into 5 focused modules
- daemon_config.go (122 lines): config/path resolution
- daemon_lifecycle.go (451 lines): start/stop/status/health/metrics
- daemon_sync.go (510 lines): export/import/sync logic
- daemon_server.go (115 lines): RPC server setup
- daemon_logger.go (43 lines): logging utilities
- Reduced main daemon.go to 389 lines (75% reduction)
- All tests pass, improved code organization

Amp-Thread-ID: https://ampcode.com/threads/T-7504c501-f962-4b82-a6d9-8e33f547757d
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-11-01 19:19:47 -07:00
parent d80e7a5fd2
commit eb00ab8005
7 changed files with 1292 additions and 1224 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

122
cmd/bd/daemon_config.go Normal file
View File

@@ -0,0 +1,122 @@
package main
import (
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/steveyegge/beads"
)
// getGlobalBeadsDir returns the global beads directory (~/.beads)
func getGlobalBeadsDir() (string, error) {
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("cannot get home directory: %w", err)
}
beadsDir := filepath.Join(home, ".beads")
if err := os.MkdirAll(beadsDir, 0700); err != nil {
return "", fmt.Errorf("cannot create global beads directory: %w", err)
}
return beadsDir, nil
}
// ensureBeadsDir ensures the local beads directory exists (.beads in the current workspace)
func ensureBeadsDir() (string, error) {
var beadsDir string
if dbPath != "" {
beadsDir = filepath.Dir(dbPath)
} else {
// Use public API to find database (same logic as other commands)
if foundDB := beads.FindDatabasePath(); foundDB != "" {
dbPath = foundDB // Store it for later use
beadsDir = filepath.Dir(foundDB)
} else {
// No database found - error out instead of falling back to ~/.beads
return "", fmt.Errorf("no database path configured (run 'bd init' or set BEADS_DB)")
}
}
if err := os.MkdirAll(beadsDir, 0700); err != nil {
return "", fmt.Errorf("cannot create beads directory: %w", err)
}
return beadsDir, nil
}
// boolToFlag returns the flag string if condition is true, otherwise returns empty string
func boolToFlag(condition bool, flag string) string {
if condition {
return flag
}
return ""
}
// getEnvInt reads an integer from environment variable with a default value
func getEnvInt(key string, defaultValue int) int {
if val := os.Getenv(key); val != "" {
if parsed, err := strconv.Atoi(val); err == nil {
return parsed
}
}
return defaultValue
}
// getEnvBool reads a boolean from environment variable with a default value
func getEnvBool(key string, defaultValue bool) bool {
if val := os.Getenv(key); val != "" {
return val == "true" || val == "1"
}
return defaultValue
}
// getSocketPathForPID determines the socket path for a given PID file
func getSocketPathForPID(pidFile string, global bool) string {
if global {
home, _ := os.UserHomeDir()
return filepath.Join(home, ".beads", "bd.sock")
}
// Local daemon: socket is in same directory as PID file
return filepath.Join(filepath.Dir(pidFile), "bd.sock")
}
// getPIDFilePath returns the path to the daemon PID file
func getPIDFilePath(global bool) (string, error) {
var beadsDir string
var err error
if global {
beadsDir, err = getGlobalBeadsDir()
} else {
beadsDir, err = ensureBeadsDir()
}
if err != nil {
return "", err
}
return filepath.Join(beadsDir, "daemon.pid"), nil
}
// getLogFilePath returns the path to the daemon log file
func getLogFilePath(userPath string, global bool) (string, error) {
if userPath != "" {
return userPath, nil
}
var beadsDir string
var err error
if global {
beadsDir, err = getGlobalBeadsDir()
} else {
beadsDir, err = ensureBeadsDir()
}
if err != nil {
return "", err
}
return filepath.Join(beadsDir, "daemon.log"), nil
}

451
cmd/bd/daemon_lifecycle.go Normal file
View File

@@ -0,0 +1,451 @@
package main
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/steveyegge/beads/internal/rpc"
)
// isDaemonRunning checks if the daemon is currently running
func isDaemonRunning(pidFile string) (bool, int) {
beadsDir := filepath.Dir(pidFile)
return tryDaemonLock(beadsDir)
}
// formatUptime formats uptime seconds into a human-readable string
func formatUptime(seconds float64) string {
if seconds < 60 {
return fmt.Sprintf("%.1f seconds", seconds)
}
if seconds < 3600 {
minutes := int(seconds / 60)
secs := int(seconds) % 60
return fmt.Sprintf("%dm %ds", minutes, secs)
}
if seconds < 86400 {
hours := int(seconds / 3600)
minutes := int(seconds/60) % 60
return fmt.Sprintf("%dh %dm", hours, minutes)
}
days := int(seconds / 86400)
hours := int(seconds/3600) % 24
return fmt.Sprintf("%dd %dh", days, hours)
}
// showDaemonStatus displays the current daemon status
func showDaemonStatus(pidFile string, global bool) {
if isRunning, pid := isDaemonRunning(pidFile); isRunning {
scope := "local"
if global {
scope = "global"
}
var started string
if info, err := os.Stat(pidFile); err == nil {
started = info.ModTime().Format("2006-01-02 15:04:05")
}
var logPath string
if lp, err := getLogFilePath("", global); err == nil {
if _, err := os.Stat(lp); err == nil {
logPath = lp
}
}
if jsonOutput {
status := map[string]interface{}{
"running": true,
"pid": pid,
"scope": scope,
}
if started != "" {
status["started"] = started
}
if logPath != "" {
status["log_path"] = logPath
}
outputJSON(status)
return
}
fmt.Printf("Daemon is running (PID %d, %s)\n", pid, scope)
if started != "" {
fmt.Printf(" Started: %s\n", started)
}
if logPath != "" {
fmt.Printf(" Log: %s\n", logPath)
}
} else {
if jsonOutput {
outputJSON(map[string]interface{}{"running": false})
return
}
fmt.Println("Daemon is not running")
}
}
// showDaemonHealth displays daemon health information
func showDaemonHealth(global bool) {
var socketPath string
if global {
home, err := os.UserHomeDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: cannot get home directory: %v\n", err)
os.Exit(1)
}
socketPath = filepath.Join(home, ".beads", "bd.sock")
} else {
beadsDir, err := ensureBeadsDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
socketPath = filepath.Join(beadsDir, "bd.sock")
}
client, err := rpc.TryConnect(socketPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error connecting to daemon: %v\n", err)
os.Exit(1)
}
if client == nil {
fmt.Println("Daemon is not running")
os.Exit(1)
}
defer func() { _ = client.Close() }()
health, err := client.Health()
if err != nil {
fmt.Fprintf(os.Stderr, "Error checking health: %v\n", err)
os.Exit(1)
}
if jsonOutput {
data, _ := json.MarshalIndent(health, "", " ")
fmt.Println(string(data))
return
}
fmt.Printf("Daemon Health: %s\n", strings.ToUpper(health.Status))
fmt.Printf(" Version: %s\n", health.Version)
fmt.Printf(" Uptime: %s\n", formatUptime(health.Uptime))
fmt.Printf(" DB Response Time: %.2f ms\n", health.DBResponseTime)
if health.Error != "" {
fmt.Printf(" Error: %s\n", health.Error)
}
if health.Status == "unhealthy" {
os.Exit(1)
}
}
// showDaemonMetrics displays daemon metrics
func showDaemonMetrics(global bool) {
var socketPath string
if global {
home, err := os.UserHomeDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: cannot get home directory: %v\n", err)
os.Exit(1)
}
socketPath = filepath.Join(home, ".beads", "bd.sock")
} else {
beadsDir, err := ensureBeadsDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
socketPath = filepath.Join(beadsDir, "bd.sock")
}
client, err := rpc.TryConnect(socketPath)
if err != nil {
fmt.Fprintf(os.Stderr, "Error connecting to daemon: %v\n", err)
os.Exit(1)
}
if client == nil {
fmt.Println("Daemon is not running")
os.Exit(1)
}
defer func() { _ = client.Close() }()
metrics, err := client.Metrics()
if err != nil {
fmt.Fprintf(os.Stderr, "Error fetching metrics: %v\n", err)
os.Exit(1)
}
if jsonOutput {
data, _ := json.MarshalIndent(metrics, "", " ")
fmt.Println(string(data))
return
}
// Human-readable output
fmt.Printf("Daemon Metrics\n")
fmt.Printf("==============\n\n")
fmt.Printf("Uptime: %.1f seconds (%.1f minutes)\n", metrics.UptimeSeconds, metrics.UptimeSeconds/60)
fmt.Printf("Timestamp: %s\n\n", metrics.Timestamp.Format(time.RFC3339))
// Connection metrics
fmt.Printf("Connection Metrics:\n")
fmt.Printf(" Total: %d\n", metrics.TotalConns)
fmt.Printf(" Active: %d\n", metrics.ActiveConns)
fmt.Printf(" Rejected: %d\n\n", metrics.RejectedConns)
// System metrics
fmt.Printf("System Metrics:\n")
fmt.Printf(" Memory Alloc: %d MB\n", metrics.MemoryAllocMB)
fmt.Printf(" Memory Sys: %d MB\n", metrics.MemorySysMB)
fmt.Printf(" Goroutines: %d\n\n", metrics.GoroutineCount)
// Operation metrics
if len(metrics.Operations) > 0 {
fmt.Printf("Operation Metrics:\n")
for _, op := range metrics.Operations {
fmt.Printf("\n %s:\n", op.Operation)
fmt.Printf(" Total Requests: %d\n", op.TotalCount)
fmt.Printf(" Successful: %d\n", op.SuccessCount)
fmt.Printf(" Errors: %d\n", op.ErrorCount)
if op.Latency.AvgMS > 0 {
fmt.Printf(" Latency:\n")
fmt.Printf(" Min: %.3f ms\n", op.Latency.MinMS)
fmt.Printf(" Avg: %.3f ms\n", op.Latency.AvgMS)
fmt.Printf(" P50: %.3f ms\n", op.Latency.P50MS)
fmt.Printf(" P95: %.3f ms\n", op.Latency.P95MS)
fmt.Printf(" P99: %.3f ms\n", op.Latency.P99MS)
fmt.Printf(" Max: %.3f ms\n", op.Latency.MaxMS)
}
}
}
}
// migrateToGlobalDaemon migrates from local to global daemon
func migrateToGlobalDaemon() {
home, err := os.UserHomeDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: cannot get home directory: %v\n", err)
os.Exit(1)
}
localPIDFile := filepath.Join(".beads", "daemon.pid")
globalPIDFile := filepath.Join(home, ".beads", "daemon.pid")
// Check if local daemon is running
localRunning, localPID := isDaemonRunning(localPIDFile)
if !localRunning {
fmt.Println("No local daemon is running")
} else {
fmt.Printf("Stopping local daemon (PID %d)...\n", localPID)
stopDaemon(localPIDFile)
}
// Check if global daemon is already running
globalRunning, globalPID := isDaemonRunning(globalPIDFile)
if globalRunning {
fmt.Printf("Global daemon already running (PID %d)\n", globalPID)
return
}
// Start global daemon
fmt.Println("Starting global daemon...")
binPath, err := os.Executable()
if err != nil {
binPath = os.Args[0]
}
cmd := exec.Command(binPath, "daemon", "--global") // #nosec G204 - bd daemon command from trusted binary
devNull, err := os.OpenFile(os.DevNull, os.O_RDWR, 0)
if err == nil {
cmd.Stdout = devNull
cmd.Stderr = devNull
cmd.Stdin = devNull
defer func() { _ = devNull.Close() }()
}
configureDaemonProcess(cmd)
if err := cmd.Start(); err != nil {
fmt.Fprintf(os.Stderr, "Error: failed to start global daemon: %v\n", err)
os.Exit(1)
}
go func() { _ = cmd.Wait() }()
// Wait for daemon to be ready
time.Sleep(2 * time.Second)
if isRunning, pid := isDaemonRunning(globalPIDFile); isRunning {
fmt.Printf("Global daemon started successfully (PID %d)\n", pid)
fmt.Println()
fmt.Println("Migration complete! The global daemon will now serve all your beads repositories.")
fmt.Println("Set BEADS_PREFER_GLOBAL_DAEMON=1 in your shell to make this permanent.")
} else {
fmt.Fprintf(os.Stderr, "Error: global daemon failed to start\n")
os.Exit(1)
}
}
// stopDaemon stops a running daemon
func stopDaemon(pidFile string) {
isRunning, pid := isDaemonRunning(pidFile)
if !isRunning {
fmt.Println("Daemon is not running")
return
}
fmt.Printf("Stopping daemon (PID %d)...\n", pid)
process, err := os.FindProcess(pid)
if err != nil {
fmt.Fprintf(os.Stderr, "Error finding process: %v\n", err)
os.Exit(1)
}
if err := sendStopSignal(process); err != nil {
fmt.Fprintf(os.Stderr, "Error signaling daemon: %v\n", err)
os.Exit(1)
}
for i := 0; i < 50; i++ {
time.Sleep(100 * time.Millisecond)
if isRunning, _ := isDaemonRunning(pidFile); !isRunning {
fmt.Println("Daemon stopped")
return
}
}
fmt.Fprintf(os.Stderr, "Warning: daemon did not stop after 5 seconds, forcing termination\n")
// Check one more time before killing the process to avoid a race.
if isRunning, _ := isDaemonRunning(pidFile); !isRunning {
fmt.Println("Daemon stopped")
return
}
if err := process.Kill(); err != nil {
// Ignore "process already finished" errors
if !strings.Contains(err.Error(), "process already finished") {
fmt.Fprintf(os.Stderr, "Error killing process: %v\n", err)
}
}
_ = os.Remove(pidFile)
fmt.Println("Daemon killed")
}
// startDaemon starts the daemon in background
func startDaemon(interval time.Duration, autoCommit, autoPush bool, logFile, pidFile string, global bool) {
logPath, err := getLogFilePath(logFile, global)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
if os.Getenv("BD_DAEMON_FOREGROUND") == "1" {
runDaemonLoop(interval, autoCommit, autoPush, logPath, pidFile, global)
return
}
exe, err := os.Executable()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: cannot resolve executable path: %v\n", err)
os.Exit(1)
}
args := []string{"daemon",
"--interval", interval.String(),
}
if autoCommit {
args = append(args, "--auto-commit")
}
if autoPush {
args = append(args, "--auto-push")
}
if logFile != "" {
args = append(args, "--log", logFile)
}
if global {
args = append(args, "--global")
}
cmd := exec.Command(exe, args...) // #nosec G204 - bd daemon command from trusted binary
cmd.Env = append(os.Environ(), "BD_DAEMON_FOREGROUND=1")
configureDaemonProcess(cmd)
devNull, err := os.OpenFile(os.DevNull, os.O_RDWR, 0)
if err != nil {
fmt.Fprintf(os.Stderr, "Error opening /dev/null: %v\n", err)
os.Exit(1)
}
defer func() { _ = devNull.Close() }()
cmd.Stdin = devNull
cmd.Stdout = devNull
cmd.Stderr = devNull
if err := cmd.Start(); err != nil {
fmt.Fprintf(os.Stderr, "Error starting daemon: %v\n", err)
os.Exit(1)
}
expectedPID := cmd.Process.Pid
if err := cmd.Process.Release(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to release process: %v\n", err)
}
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
// #nosec G304 - controlled path from config
if data, err := os.ReadFile(pidFile); err == nil {
if pid, err := strconv.Atoi(strings.TrimSpace(string(data))); err == nil && pid == expectedPID {
fmt.Printf("Daemon started (PID %d)\n", expectedPID)
return
}
}
}
fmt.Fprintf(os.Stderr, "Warning: daemon may have failed to start (PID file not confirmed)\n")
fmt.Fprintf(os.Stderr, "Check log file: %s\n", logPath)
}
// setupDaemonLock acquires the daemon lock and writes PID file
func setupDaemonLock(pidFile string, dbPath string, log daemonLogger) (*DaemonLock, error) {
beadsDir := filepath.Dir(pidFile)
lock, err := acquireDaemonLock(beadsDir, dbPath)
if err != nil {
if err == ErrDaemonLocked {
log.log("Daemon already running (lock held), exiting")
} else {
log.log("Error acquiring daemon lock: %v", err)
}
return nil, err
}
myPID := os.Getpid()
// #nosec G304 - controlled path from config
if data, err := os.ReadFile(pidFile); err == nil {
if pid, err := strconv.Atoi(strings.TrimSpace(string(data))); err == nil && pid == myPID {
// PID file is correct, continue
} else {
log.log("PID file has wrong PID (expected %d, got %d), overwriting", myPID, pid)
_ = os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600)
}
} else {
log.log("PID file missing after lock acquisition, creating")
_ = os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", myPID)), 0600)
}
return lock, nil
}

43
cmd/bd/daemon_logger.go Normal file
View File

@@ -0,0 +1,43 @@
package main
import (
"fmt"
"time"
"gopkg.in/natefinch/lumberjack.v2"
)
// daemonLogger wraps a logging function for the daemon
type daemonLogger struct {
logFunc func(string, ...interface{})
}
func (d *daemonLogger) log(format string, args ...interface{}) {
d.logFunc(format, args...)
}
// setupDaemonLogger creates a rotating log file logger for the daemon
func setupDaemonLogger(logPath string) (*lumberjack.Logger, daemonLogger) {
maxSizeMB := getEnvInt("BEADS_DAEMON_LOG_MAX_SIZE", 10)
maxBackups := getEnvInt("BEADS_DAEMON_LOG_MAX_BACKUPS", 3)
maxAgeDays := getEnvInt("BEADS_DAEMON_LOG_MAX_AGE", 7)
compress := getEnvBool("BEADS_DAEMON_LOG_COMPRESS", true)
logF := &lumberjack.Logger{
Filename: logPath,
MaxSize: maxSizeMB,
MaxBackups: maxBackups,
MaxAge: maxAgeDays,
Compress: compress,
}
logger := daemonLogger{
logFunc: func(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
timestamp := time.Now().Format("2006-01-02 15:04:05")
_, _ = fmt.Fprintf(logF, "[%s] %s\n", timestamp, msg)
},
}
return logF, logger
}

115
cmd/bd/daemon_server.go Normal file
View File

@@ -0,0 +1,115 @@
package main
import (
"context"
"os"
"os/signal"
"path/filepath"
"time"
"github.com/steveyegge/beads/internal/rpc"
"github.com/steveyegge/beads/internal/storage"
)
// startRPCServer initializes and starts the RPC server
func startRPCServer(ctx context.Context, socketPath string, store storage.Storage, workspacePath string, dbPath string, log daemonLogger) (*rpc.Server, chan error, error) {
// Sync daemon version with CLI version
rpc.ServerVersion = Version
server := rpc.NewServer(socketPath, store, workspacePath, dbPath)
serverErrChan := make(chan error, 1)
go func() {
log.log("Starting RPC server: %s", socketPath)
if err := server.Start(ctx); err != nil {
log.log("RPC server error: %v", err)
serverErrChan <- err
}
}()
select {
case err := <-serverErrChan:
log.log("RPC server failed to start: %v", err)
return nil, nil, err
case <-server.WaitReady():
log.log("RPC server ready (socket listening)")
case <-time.After(5 * time.Second):
log.log("WARNING: Server didn't signal ready after 5 seconds (may still be starting)")
}
return server, serverErrChan, nil
}
// runGlobalDaemon runs the global routing daemon
func runGlobalDaemon(log daemonLogger) {
globalDir, err := getGlobalBeadsDir()
if err != nil {
log.log("Error: cannot get global beads directory: %v", err)
os.Exit(1)
}
socketPath := filepath.Join(globalDir, "bd.sock")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
server, _, err := startRPCServer(ctx, socketPath, nil, globalDir, "", log)
if err != nil {
return
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, daemonSignals...)
defer signal.Stop(sigChan)
sig := <-sigChan
log.log("Received signal: %v", sig)
log.log("Shutting down global daemon...")
cancel()
if err := server.Stop(); err != nil {
log.log("Error stopping server: %v", err)
}
log.log("Global daemon stopped")
}
// runEventLoop runs the daemon event loop (polling mode)
func runEventLoop(ctx context.Context, cancel context.CancelFunc, ticker *time.Ticker, doSync func(), server *rpc.Server, serverErrChan chan error, log daemonLogger) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, daemonSignals...)
defer signal.Stop(sigChan)
for {
select {
case <-ticker.C:
if ctx.Err() != nil {
return
}
doSync()
case sig := <-sigChan:
if isReloadSignal(sig) {
log.log("Received reload signal, ignoring (daemon continues running)")
continue
}
log.log("Received signal %v, shutting down gracefully...", sig)
cancel()
if err := server.Stop(); err != nil {
log.log("Error stopping RPC server: %v", err)
}
return
case <-ctx.Done():
log.log("Context canceled, shutting down")
if err := server.Stop(); err != nil {
log.log("Error stopping RPC server: %v", err)
}
return
case err := <-serverErrChan:
log.log("RPC server failed: %v", err)
cancel()
if err := server.Stop(); err != nil {
log.log("Error stopping RPC server: %v", err)
}
return
}
}
}

510
cmd/bd/daemon_sync.go Normal file
View File

@@ -0,0 +1,510 @@
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"time"
"github.com/steveyegge/beads"
"github.com/steveyegge/beads/internal/storage"
"github.com/steveyegge/beads/internal/types"
)
// exportToJSONLWithStore exports issues to JSONL using the provided store
func exportToJSONLWithStore(ctx context.Context, store storage.Storage, jsonlPath string) error {
// Get all issues
issues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
if err != nil {
return fmt.Errorf("failed to get issues: %w", err)
}
// Safety check: prevent exporting empty database over non-empty JSONL
if len(issues) == 0 {
existingCount, err := countIssuesInJSONL(jsonlPath)
if err != nil {
// If we can't read the file, it might not exist yet, which is fine
if !os.IsNotExist(err) {
return fmt.Errorf("warning: failed to read existing JSONL: %w", err)
}
} else if existingCount > 0 {
return fmt.Errorf("refusing to export empty database over non-empty JSONL file (database: 0 issues, JSONL: %d issues). This would result in data loss", existingCount)
}
}
// Sort by ID for consistent output
sort.Slice(issues, func(i, j int) bool {
return issues[i].ID < issues[j].ID
})
// Populate dependencies for all issues
allDeps, err := store.GetAllDependencyRecords(ctx)
if err != nil {
return fmt.Errorf("failed to get dependencies: %w", err)
}
for _, issue := range issues {
issue.Dependencies = allDeps[issue.ID]
}
// Populate labels for all issues
for _, issue := range issues {
labels, err := store.GetLabels(ctx, issue.ID)
if err != nil {
return fmt.Errorf("failed to get labels for %s: %w", issue.ID, err)
}
issue.Labels = labels
}
// Populate comments for all issues
for _, issue := range issues {
comments, err := store.GetIssueComments(ctx, issue.ID)
if err != nil {
return fmt.Errorf("failed to get comments for %s: %w", issue.ID, err)
}
issue.Comments = comments
}
// Create temp file for atomic write
dir := filepath.Dir(jsonlPath)
base := filepath.Base(jsonlPath)
tempFile, err := os.CreateTemp(dir, base+".tmp.*")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
tempPath := tempFile.Name()
// Use defer pattern for proper cleanup
var writeErr error
defer func() {
_ = tempFile.Close()
if writeErr != nil {
_ = os.Remove(tempPath) // Remove temp file on error
}
}()
// Write JSONL
for _, issue := range issues {
data, marshalErr := json.Marshal(issue)
if marshalErr != nil {
writeErr = fmt.Errorf("failed to marshal issue %s: %w", issue.ID, marshalErr)
return writeErr
}
if _, writeErr = tempFile.Write(data); writeErr != nil {
writeErr = fmt.Errorf("failed to write issue %s: %w", issue.ID, writeErr)
return writeErr
}
if _, writeErr = tempFile.WriteString("\n"); writeErr != nil {
writeErr = fmt.Errorf("failed to write newline: %w", writeErr)
return writeErr
}
}
// Close before rename
if writeErr = tempFile.Close(); writeErr != nil {
writeErr = fmt.Errorf("failed to close temp file: %w", writeErr)
return writeErr
}
// Atomic rename
if writeErr = os.Rename(tempPath, jsonlPath); writeErr != nil {
writeErr = fmt.Errorf("failed to rename temp file: %w", writeErr)
return writeErr
}
return nil
}
// importToJSONLWithStore imports issues from JSONL using the provided store
func importToJSONLWithStore(ctx context.Context, store storage.Storage, jsonlPath string) error {
// Read JSONL file
file, err := os.Open(jsonlPath) // #nosec G304 - controlled path from config
if err != nil {
return fmt.Errorf("failed to open JSONL: %w", err)
}
defer file.Close()
// Parse all issues
var issues []*types.Issue
scanner := bufio.NewScanner(file)
lineNum := 0
for scanner.Scan() {
lineNum++
line := scanner.Text()
// Skip empty lines
if line == "" {
continue
}
// Parse JSON
var issue types.Issue
if err := json.Unmarshal([]byte(line), &issue); err != nil {
// Log error but continue - don't fail entire import
fmt.Fprintf(os.Stderr, "Warning: failed to parse JSONL line %d: %v\n", lineNum, err)
continue
}
issues = append(issues, &issue)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("failed to read JSONL: %w", err)
}
// Use existing import logic with auto-conflict resolution
opts := ImportOptions{
DryRun: false,
SkipUpdate: false,
Strict: false,
SkipPrefixValidation: true, // Skip prefix validation for auto-import
}
_, err = importIssuesCore(ctx, "", store, issues, opts)
return err
}
// validateDatabaseFingerprint checks that the database belongs to this repository
func validateDatabaseFingerprint(store storage.Storage, log *daemonLogger) error {
ctx := context.Background()
// Get stored repo ID
storedRepoID, err := store.GetMetadata(ctx, "repo_id")
if err != nil && err.Error() != "metadata key not found: repo_id" {
return fmt.Errorf("failed to read repo_id: %w", err)
}
// If no repo_id, this is a legacy database - require explicit migration
if storedRepoID == "" {
return fmt.Errorf(`
LEGACY DATABASE DETECTED!
This database was created before version 0.17.5 and lacks a repository fingerprint.
To continue using this database, you must explicitly set its repository ID:
bd migrate --update-repo-id
This ensures the database is bound to this repository and prevents accidental
database sharing between different repositories.
If this is a fresh clone, run:
rm -rf .beads && bd init
Note: Auto-claiming legacy databases is intentionally disabled to prevent
silent corruption when databases are copied between repositories.
`)
}
// Validate repo ID matches current repository
currentRepoID, err := beads.ComputeRepoID()
if err != nil {
log.log("Warning: could not compute current repository ID: %v", err)
return nil
}
if storedRepoID != currentRepoID {
return fmt.Errorf(`
DATABASE MISMATCH DETECTED!
This database belongs to a different repository:
Database repo ID: %s
Current repo ID: %s
This usually means:
1. You copied a .beads directory from another repo (don't do this!)
2. Git remote URL changed (run 'bd migrate --update-repo-id')
3. Database corruption
4. bd was upgraded and URL canonicalization changed
Solutions:
- If remote URL changed: bd migrate --update-repo-id
- If bd was upgraded: bd migrate --update-repo-id
- If wrong database: rm -rf .beads && bd init
- If correct database: BEADS_IGNORE_REPO_MISMATCH=1 bd daemon
(Warning: This can cause data corruption across clones!)
`, storedRepoID[:8], currentRepoID[:8])
}
log.log("Repository fingerprint validated: %s", currentRepoID[:8])
return nil
}
// createExportFunc creates a function that only exports database to JSONL
// and optionally commits/pushes (no git pull or import). Used for mutation events.
func createExportFunc(ctx context.Context, store storage.Storage, autoCommit, autoPush bool, log daemonLogger) func() {
return func() {
exportCtx, exportCancel := context.WithTimeout(ctx, 30*time.Second)
defer exportCancel()
log.log("Starting export...")
jsonlPath := findJSONLPath()
if jsonlPath == "" {
log.log("Error: JSONL path not found")
return
}
// Check for exclusive lock
beadsDir := filepath.Dir(jsonlPath)
skip, holder, err := types.ShouldSkipDatabase(beadsDir)
if skip {
if err != nil {
log.log("Skipping export (lock check failed: %v)", err)
} else {
log.log("Skipping export (locked by %s)", holder)
}
return
}
if holder != "" {
log.log("Removed stale lock (%s), proceeding", holder)
}
// Pre-export validation
if err := validatePreExport(exportCtx, store, jsonlPath); err != nil {
log.log("Pre-export validation failed: %v", err)
return
}
// Export to JSONL
if err := exportToJSONLWithStore(exportCtx, store, jsonlPath); err != nil {
log.log("Export failed: %v", err)
return
}
log.log("Exported to JSONL")
// Auto-commit if enabled
if autoCommit {
hasChanges, err := gitHasChanges(exportCtx, jsonlPath)
if err != nil {
log.log("Error checking git status: %v", err)
return
}
if hasChanges {
message := fmt.Sprintf("bd daemon export: %s", time.Now().Format("2006-01-02 15:04:05"))
if err := gitCommit(exportCtx, jsonlPath, message); err != nil {
log.log("Commit failed: %v", err)
return
}
log.log("Committed changes")
// Auto-push if enabled
if autoPush {
if err := gitPush(exportCtx); err != nil {
log.log("Push failed: %v", err)
return
}
log.log("Pushed to remote")
}
}
}
log.log("Export complete")
}
}
// createAutoImportFunc creates a function that pulls from git and imports JSONL
// to database (no export). Used for file system change events.
func createAutoImportFunc(ctx context.Context, store storage.Storage, log daemonLogger) func() {
return func() {
importCtx, importCancel := context.WithTimeout(ctx, 1*time.Minute)
defer importCancel()
log.log("Starting auto-import...")
jsonlPath := findJSONLPath()
if jsonlPath == "" {
log.log("Error: JSONL path not found")
return
}
// Check for exclusive lock
beadsDir := filepath.Dir(jsonlPath)
skip, holder, err := types.ShouldSkipDatabase(beadsDir)
if skip {
if err != nil {
log.log("Skipping import (lock check failed: %v)", err)
} else {
log.log("Skipping import (locked by %s)", holder)
}
return
}
if holder != "" {
log.log("Removed stale lock (%s), proceeding", holder)
}
// Check JSONL modification time to avoid redundant imports
jsonlInfo, err := os.Stat(jsonlPath)
if err != nil {
log.log("Failed to stat JSONL: %v", err)
return
}
// Get database modification time
dbPath := filepath.Join(beadsDir, "beads.db")
dbInfo, err := os.Stat(dbPath)
if err != nil {
log.log("Failed to stat database: %v", err)
return
}
// Skip if JSONL is older than database (nothing new to import)
if !jsonlInfo.ModTime().After(dbInfo.ModTime()) {
log.log("Skipping import: JSONL not newer than database")
return
}
// Pull from git
if err := gitPull(importCtx); err != nil {
log.log("Pull failed: %v", err)
return
}
log.log("Pulled from remote")
// Count issues before import
beforeCount, err := countDBIssues(importCtx, store)
if err != nil {
log.log("Failed to count issues before import: %v", err)
return
}
// Import from JSONL
if err := importToJSONLWithStore(importCtx, store, jsonlPath); err != nil {
log.log("Import failed: %v", err)
return
}
log.log("Imported from JSONL")
// Validate import
afterCount, err := countDBIssues(importCtx, store)
if err != nil {
log.log("Failed to count issues after import: %v", err)
return
}
if err := validatePostImport(beforeCount, afterCount); err != nil {
log.log("Post-import validation failed: %v", err)
return
}
log.log("Auto-import complete")
}
}
// createSyncFunc creates a function that performs full sync cycle (export, commit, pull, import, push)
func createSyncFunc(ctx context.Context, store storage.Storage, autoCommit, autoPush bool, log daemonLogger) func() {
return func() {
syncCtx, syncCancel := context.WithTimeout(ctx, 2*time.Minute)
defer syncCancel()
log.log("Starting sync cycle...")
jsonlPath := findJSONLPath()
if jsonlPath == "" {
log.log("Error: JSONL path not found")
return
}
// Check for exclusive lock before processing database
beadsDir := filepath.Dir(jsonlPath)
skip, holder, err := types.ShouldSkipDatabase(beadsDir)
if skip {
if err != nil {
log.log("Skipping database (lock check failed: %v)", err)
} else {
log.log("Skipping database (locked by %s)", holder)
}
return
}
if holder != "" {
log.log("Removed stale lock (%s), proceeding with sync", holder)
}
// Integrity check: validate before export
if err := validatePreExport(syncCtx, store, jsonlPath); err != nil {
log.log("Pre-export validation failed: %v", err)
return
}
// Check for duplicate IDs (database corruption)
if err := checkDuplicateIDs(syncCtx, store); err != nil {
log.log("Duplicate ID check failed: %v", err)
return
}
// Check for orphaned dependencies (warns but doesn't fail)
if orphaned, err := checkOrphanedDeps(syncCtx, store); err != nil {
log.log("Orphaned dependency check failed: %v", err)
} else if len(orphaned) > 0 {
log.log("Found %d orphaned dependencies: %v", len(orphaned), orphaned)
}
if err := exportToJSONLWithStore(syncCtx, store, jsonlPath); err != nil {
log.log("Export failed: %v", err)
return
}
log.log("Exported to JSONL")
if autoCommit {
hasChanges, err := gitHasChanges(syncCtx, jsonlPath)
if err != nil {
log.log("Error checking git status: %v", err)
return
}
if hasChanges {
message := fmt.Sprintf("bd daemon sync: %s", time.Now().Format("2006-01-02 15:04:05"))
if err := gitCommit(syncCtx, jsonlPath, message); err != nil {
log.log("Commit failed: %v", err)
return
}
log.log("Committed changes")
}
}
if err := gitPull(syncCtx); err != nil {
log.log("Pull failed: %v", err)
return
}
log.log("Pulled from remote")
// Count issues before import for validation
beforeCount, err := countDBIssues(syncCtx, store)
if err != nil {
log.log("Failed to count issues before import: %v", err)
return
}
if err := importToJSONLWithStore(syncCtx, store, jsonlPath); err != nil {
log.log("Import failed: %v", err)
return
}
log.log("Imported from JSONL")
// Validate import didn't cause data loss
afterCount, err := countDBIssues(syncCtx, store)
if err != nil {
log.log("Failed to count issues after import: %v", err)
return
}
if err := validatePostImport(beforeCount, afterCount); err != nil {
log.log("Post-import validation failed: %v", err)
return
}
if autoPush && autoCommit {
if err := gitPush(syncCtx); err != nil {
log.log("Push failed: %v", err)
return
}
log.log("Pushed to remote")
}
log.log("Sync cycle complete")
}
}