Remove unused internal/daemonrunner/ package (~1,500 LOC)
Removed old global daemon infrastructure that was replaced by per-workspace daemon architecture. Package had no imports and all functions were unreachable. Closes bd-irq6
This commit is contained in:
@@ -1,26 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import "time"
|
||||
|
||||
// Config holds all configuration for the daemon
|
||||
type Config struct {
|
||||
// Sync behavior
|
||||
Interval time.Duration
|
||||
AutoCommit bool
|
||||
AutoPush bool
|
||||
|
||||
// Scope
|
||||
Global bool
|
||||
|
||||
// Paths
|
||||
LogFile string
|
||||
PIDFile string
|
||||
DBPath string // Local daemon only
|
||||
BeadsDir string // Local daemon: .beads dir, Global daemon: ~/.beads
|
||||
|
||||
// RPC
|
||||
SocketPath string
|
||||
|
||||
// Workspace
|
||||
WorkspacePath string // Only for local daemon: parent of .beads directory
|
||||
}
|
||||
@@ -1,98 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestConfigDefaults(t *testing.T) {
|
||||
cfg := Config{
|
||||
Interval: 5 * time.Second,
|
||||
AutoCommit: true,
|
||||
AutoPush: false,
|
||||
Global: false,
|
||||
}
|
||||
|
||||
if cfg.Interval != 5*time.Second {
|
||||
t.Errorf("Expected Interval 5s, got %v", cfg.Interval)
|
||||
}
|
||||
if !cfg.AutoCommit {
|
||||
t.Error("Expected AutoCommit to be true")
|
||||
}
|
||||
if cfg.AutoPush {
|
||||
t.Error("Expected AutoPush to be false")
|
||||
}
|
||||
if cfg.Global {
|
||||
t.Error("Expected Global to be false")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigLocalDaemon(t *testing.T) {
|
||||
cfg := Config{
|
||||
Global: false,
|
||||
WorkspacePath: "/tmp/test-workspace",
|
||||
BeadsDir: "/tmp/test-workspace/.beads",
|
||||
DBPath: "/tmp/test-workspace/.beads/beads.db",
|
||||
SocketPath: "/tmp/test-workspace/.beads/bd.sock",
|
||||
LogFile: "/tmp/test-workspace/.beads/daemon.log",
|
||||
PIDFile: "/tmp/test-workspace/.beads/daemon.pid",
|
||||
}
|
||||
|
||||
if cfg.Global {
|
||||
t.Error("Expected local daemon (Global=false)")
|
||||
}
|
||||
if cfg.WorkspacePath == "" {
|
||||
t.Error("Expected WorkspacePath to be set for local daemon")
|
||||
}
|
||||
if cfg.DBPath == "" {
|
||||
t.Error("Expected DBPath to be set for local daemon")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigGlobalDaemon(t *testing.T) {
|
||||
cfg := Config{
|
||||
Global: true,
|
||||
BeadsDir: "/home/user/.beads",
|
||||
SocketPath: "/home/user/.beads/global.sock",
|
||||
LogFile: "/home/user/.beads/global-daemon.log",
|
||||
PIDFile: "/home/user/.beads/global-daemon.pid",
|
||||
}
|
||||
|
||||
if !cfg.Global {
|
||||
t.Error("Expected global daemon (Global=true)")
|
||||
}
|
||||
if cfg.WorkspacePath != "" {
|
||||
t.Error("Expected WorkspacePath to be empty for global daemon")
|
||||
}
|
||||
if cfg.DBPath != "" {
|
||||
t.Error("Expected DBPath to be empty for global daemon")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigSyncBehavior(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
autoCommit bool
|
||||
autoPush bool
|
||||
}{
|
||||
{"no sync", false, false},
|
||||
{"commit only", true, false},
|
||||
{"commit and push", true, true},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cfg := Config{
|
||||
AutoCommit: tt.autoCommit,
|
||||
AutoPush: tt.autoPush,
|
||||
}
|
||||
|
||||
if cfg.AutoCommit != tt.autoCommit {
|
||||
t.Errorf("Expected AutoCommit=%v, got %v", tt.autoCommit, cfg.AutoCommit)
|
||||
}
|
||||
if cfg.AutoPush != tt.autoPush {
|
||||
t.Errorf("Expected AutoPush=%v, got %v", tt.autoPush, cfg.AutoPush)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,348 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/beads"
|
||||
"github.com/steveyegge/beads/internal/daemon"
|
||||
"github.com/steveyegge/beads/internal/rpc"
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
"github.com/steveyegge/beads/internal/storage/sqlite"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
)
|
||||
|
||||
// Daemon represents a running background daemon
|
||||
type Daemon struct {
|
||||
cfg Config
|
||||
log *logger
|
||||
logF *lumberjack.Logger
|
||||
store storage.Storage
|
||||
server *rpc.Server
|
||||
lock io.Closer
|
||||
cancel context.CancelFunc
|
||||
|
||||
// Version is the daemon's build version
|
||||
Version string
|
||||
}
|
||||
|
||||
// New creates a new Daemon instance
|
||||
func New(cfg Config, version string) *Daemon {
|
||||
return &Daemon{
|
||||
cfg: cfg,
|
||||
Version: version,
|
||||
}
|
||||
}
|
||||
|
||||
// Start runs the daemon main loop
|
||||
func (d *Daemon) Start() error {
|
||||
// Setup logger
|
||||
d.logF, d.log = d.setupLogger()
|
||||
defer func() { _ = d.logF.Close() }()
|
||||
|
||||
// Determine database path for local daemon
|
||||
if !d.cfg.Global {
|
||||
if err := d.determineDatabasePath(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire daemon lock
|
||||
lock, err := d.setupLock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.lock = lock
|
||||
defer func() { _ = d.lock.Close() }()
|
||||
defer func() { _ = os.Remove(d.cfg.PIDFile) }()
|
||||
|
||||
d.log.log("Daemon started (interval: %v, auto-commit: %v, auto-push: %v)",
|
||||
d.cfg.Interval, d.cfg.AutoCommit, d.cfg.AutoPush)
|
||||
|
||||
// Handle global daemon differently
|
||||
if d.cfg.Global {
|
||||
return d.runGlobalDaemon()
|
||||
}
|
||||
|
||||
// Validate single canonical database
|
||||
if err := d.validateSingleDatabase(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.log.log("Using database: %s", d.cfg.DBPath)
|
||||
|
||||
// Clear any previous daemon-error file on successful startup
|
||||
errFile := filepath.Join(d.cfg.BeadsDir, "daemon-error")
|
||||
if err := os.Remove(errFile); err != nil && !os.IsNotExist(err) {
|
||||
d.log.log("Warning: could not remove daemon-error file: %v", err)
|
||||
}
|
||||
|
||||
// Open database
|
||||
store, err := sqlite.New(d.cfg.DBPath)
|
||||
if err != nil {
|
||||
d.log.log("Error: cannot open database: %v", err)
|
||||
return fmt.Errorf("cannot open database: %w", err)
|
||||
}
|
||||
d.store = store
|
||||
defer func() { _ = d.store.Close() }()
|
||||
d.log.log("Database opened: %s", d.cfg.DBPath)
|
||||
|
||||
// Validate database fingerprint
|
||||
if err := d.validateDatabaseFingerprint(); err != nil {
|
||||
if os.Getenv("BEADS_IGNORE_REPO_MISMATCH") != "1" {
|
||||
d.log.log("Error: %v", err)
|
||||
return err
|
||||
}
|
||||
d.log.log("Warning: repository mismatch ignored (BEADS_IGNORE_REPO_MISMATCH=1)")
|
||||
}
|
||||
|
||||
// Validate schema version
|
||||
if err := d.validateSchemaVersion(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start RPC server
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
d.cancel = cancel
|
||||
defer cancel()
|
||||
|
||||
server, serverErrChan, err := d.startRPCServer(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.server = server
|
||||
|
||||
// Register in global registry
|
||||
if err := d.registerDaemon(); err != nil {
|
||||
d.log.log("Warning: failed to register daemon: %v", err)
|
||||
} else {
|
||||
defer d.unregisterDaemon()
|
||||
}
|
||||
|
||||
// Run sync loops
|
||||
return d.runSyncLoop(ctx, serverErrChan)
|
||||
}
|
||||
|
||||
func (d *Daemon) runGlobalDaemon() error {
|
||||
globalDir, err := getGlobalBeadsDir()
|
||||
if err != nil {
|
||||
d.log.log("Error: cannot get global beads directory: %v", err)
|
||||
return err
|
||||
}
|
||||
d.cfg.SocketPath = getSocketPath(globalDir)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
d.cancel = cancel
|
||||
defer cancel()
|
||||
|
||||
server, _, err := d.startRPCServer(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.server = server
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, daemonSignals...)
|
||||
defer signal.Stop(sigChan)
|
||||
|
||||
sig := <-sigChan
|
||||
d.log.log("Received signal: %v", sig)
|
||||
d.log.log("Shutting down global daemon...")
|
||||
|
||||
cancel()
|
||||
if err := d.server.Stop(); err != nil {
|
||||
d.log.log("Error stopping server: %v", err)
|
||||
}
|
||||
|
||||
d.log.log("Global daemon stopped")
|
||||
return nil
|
||||
}
|
||||
|
||||
func getGlobalBeadsDir() (string, error) {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("cannot get home directory: %w", err)
|
||||
}
|
||||
|
||||
beadsDir := filepath.Join(home, ".beads")
|
||||
if err := os.MkdirAll(beadsDir, 0700); err != nil {
|
||||
return "", fmt.Errorf("cannot create global beads directory: %w", err)
|
||||
}
|
||||
|
||||
return beadsDir, nil
|
||||
}
|
||||
|
||||
func (d *Daemon) setupLock() (io.Closer, error) {
|
||||
beadsDir := filepath.Dir(d.cfg.PIDFile)
|
||||
lock, err := acquireDaemonLock(beadsDir, d.cfg.DBPath, d.Version)
|
||||
if err != nil {
|
||||
if err == ErrDaemonLocked {
|
||||
d.log.log("Daemon already running (lock held), exiting")
|
||||
} else {
|
||||
d.log.log("Error acquiring daemon lock: %v", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ensurePIDFileCorrect(d.cfg.PIDFile); err != nil {
|
||||
d.log.log("Warning: failed to verify PID file: %v", err)
|
||||
}
|
||||
|
||||
return lock, nil
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the daemon
|
||||
func (d *Daemon) Stop() error {
|
||||
if d.cancel != nil {
|
||||
d.cancel()
|
||||
}
|
||||
if d.server != nil {
|
||||
return d.server.Stop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) determineDatabasePath() error {
|
||||
if d.cfg.DBPath != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Use public API to find database
|
||||
foundDB := beads.FindDatabasePath()
|
||||
if foundDB == "" {
|
||||
d.log.log("Error: no beads database found")
|
||||
d.log.log("Hint: run 'bd init' to create a database or set BEADS_DB environment variable")
|
||||
return fmt.Errorf("no beads database found")
|
||||
}
|
||||
|
||||
d.cfg.DBPath = foundDB
|
||||
d.cfg.BeadsDir = filepath.Dir(foundDB)
|
||||
d.cfg.WorkspacePath = filepath.Dir(d.cfg.BeadsDir)
|
||||
d.cfg.SocketPath = getSocketPath(d.cfg.BeadsDir)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) validateSingleDatabase() error {
|
||||
// Check for multiple .db files (ambiguity error)
|
||||
matches, err := filepath.Glob(filepath.Join(d.cfg.BeadsDir, "*.db"))
|
||||
if err == nil && len(matches) > 1 {
|
||||
// Filter out backup files
|
||||
var validDBs []string
|
||||
for _, match := range matches {
|
||||
baseName := filepath.Base(match)
|
||||
if !strings.Contains(baseName, ".backup") && baseName != "vc.db" {
|
||||
validDBs = append(validDBs, match)
|
||||
}
|
||||
}
|
||||
if len(validDBs) > 1 {
|
||||
errMsg := fmt.Sprintf("Error: Multiple database files found in %s:\n", d.cfg.BeadsDir)
|
||||
for _, db := range validDBs {
|
||||
errMsg += fmt.Sprintf(" - %s\n", filepath.Base(db))
|
||||
}
|
||||
errMsg += fmt.Sprintf("\nBeads requires a single canonical database: %s\n", beads.CanonicalDatabaseName)
|
||||
errMsg += "Run 'bd init' to migrate legacy databases or manually remove old databases\n"
|
||||
errMsg += "Or run 'bd doctor' for more diagnostics"
|
||||
|
||||
d.log.log(errMsg)
|
||||
|
||||
// Write error to file so user can see it without checking logs
|
||||
errFile := filepath.Join(d.cfg.BeadsDir, "daemon-error")
|
||||
// nolint:gosec // G306: Error file needs to be readable for debugging
|
||||
_ = os.WriteFile(errFile, []byte(errMsg), 0644)
|
||||
|
||||
return fmt.Errorf("multiple database files found")
|
||||
}
|
||||
}
|
||||
|
||||
// Validate using canonical name
|
||||
dbBaseName := filepath.Base(d.cfg.DBPath)
|
||||
if dbBaseName != beads.CanonicalDatabaseName {
|
||||
d.log.log("Error: Non-canonical database name: %s", dbBaseName)
|
||||
d.log.log("Expected: %s", beads.CanonicalDatabaseName)
|
||||
d.log.log("")
|
||||
d.log.log("Run 'bd init' to migrate to canonical name")
|
||||
return fmt.Errorf("non-canonical database name: %s", dbBaseName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) validateSchemaVersion() error {
|
||||
ctx := context.Background()
|
||||
dbVersion, err := d.store.GetMetadata(ctx, "bd_version")
|
||||
if err != nil && err.Error() != "metadata key not found: bd_version" {
|
||||
d.log.log("Error: failed to read database version: %v", err)
|
||||
return fmt.Errorf("failed to read database version: %w", err)
|
||||
}
|
||||
|
||||
mismatch, missing := checkVersionMismatch(dbVersion, d.Version)
|
||||
|
||||
if mismatch {
|
||||
d.log.log("Error: Database schema version mismatch")
|
||||
d.log.log(" Database version: %s", dbVersion)
|
||||
d.log.log(" Daemon version: %s", d.Version)
|
||||
d.log.log("")
|
||||
d.log.log("The database was created with a different version of bd.")
|
||||
d.log.log("This may cause compatibility issues.")
|
||||
d.log.log("")
|
||||
d.log.log("Options:")
|
||||
d.log.log(" 1. Run 'bd migrate' to update the database to the current version")
|
||||
d.log.log(" 2. Upgrade/downgrade bd to match database version: %s", dbVersion)
|
||||
d.log.log(" 3. Set BEADS_IGNORE_VERSION_MISMATCH=1 to proceed anyway (not recommended)")
|
||||
d.log.log("")
|
||||
|
||||
if os.Getenv("BEADS_IGNORE_VERSION_MISMATCH") != "1" {
|
||||
return fmt.Errorf("database version mismatch")
|
||||
}
|
||||
d.log.log("Warning: Proceeding despite version mismatch (BEADS_IGNORE_VERSION_MISMATCH=1)")
|
||||
} else if missing {
|
||||
d.log.log("Warning: Database missing version metadata, setting to %s", d.Version)
|
||||
if err := d.store.SetMetadata(ctx, "bd_version", d.Version); err != nil {
|
||||
d.log.log("Error: failed to set database version: %v", err)
|
||||
return fmt.Errorf("failed to set database version: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) registerDaemon() error {
|
||||
registry, err := daemon.NewRegistry()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entry := daemon.RegistryEntry{
|
||||
WorkspacePath: d.cfg.WorkspacePath,
|
||||
SocketPath: d.cfg.SocketPath,
|
||||
DatabasePath: d.cfg.DBPath,
|
||||
PID: os.Getpid(),
|
||||
Version: d.Version,
|
||||
StartedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := registry.Register(entry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.log.log("Registered in global registry")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Daemon) unregisterDaemon() {
|
||||
registry, err := daemon.NewRegistry()
|
||||
if err != nil {
|
||||
d.log.log("Warning: failed to create registry for unregister: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := registry.Unregister(d.cfg.WorkspacePath, os.Getpid()); err != nil {
|
||||
d.log.log("Warning: failed to unregister daemon: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1,129 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNew(t *testing.T) {
|
||||
cfg := Config{
|
||||
Interval: 5 * time.Second,
|
||||
AutoCommit: true,
|
||||
Global: false,
|
||||
}
|
||||
|
||||
daemon := New(cfg, "0.19.0")
|
||||
|
||||
if daemon == nil {
|
||||
t.Fatal("Expected non-nil daemon")
|
||||
}
|
||||
if daemon.cfg.Interval != cfg.Interval {
|
||||
t.Errorf("Expected interval %v, got %v", cfg.Interval, daemon.cfg.Interval)
|
||||
}
|
||||
if daemon.Version != "0.19.0" {
|
||||
t.Errorf("Expected version 0.19.0, got %s", daemon.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStop(t *testing.T) {
|
||||
cfg := Config{
|
||||
Interval: 5 * time.Second,
|
||||
}
|
||||
daemon := New(cfg, "0.19.0")
|
||||
|
||||
// Stop should not error even with no server running
|
||||
if err := daemon.Stop(); err != nil {
|
||||
t.Errorf("Stop() returned unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetermineDatabasePath(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
beadsDir := filepath.Join(tmpDir, ".beads")
|
||||
dbPath := filepath.Join(beadsDir, "beads.db")
|
||||
|
||||
if err := os.MkdirAll(beadsDir, 0750); err != nil {
|
||||
t.Fatalf("Failed to create beads dir: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(dbPath, []byte("test"), 0600); err != nil {
|
||||
t.Fatalf("Failed to create db file: %v", err)
|
||||
}
|
||||
|
||||
cfg := Config{
|
||||
WorkspacePath: tmpDir,
|
||||
}
|
||||
daemon := New(cfg, "0.19.0")
|
||||
|
||||
// Override working directory for test
|
||||
oldWd, _ := os.Getwd()
|
||||
defer func() { _ = os.Chdir(oldWd) }()
|
||||
if err := os.Chdir(tmpDir); err != nil {
|
||||
t.Fatalf("Failed to change directory: %v", err)
|
||||
}
|
||||
|
||||
if err := daemon.determineDatabasePath(); err != nil {
|
||||
t.Errorf("determineDatabasePath() failed: %v", err)
|
||||
}
|
||||
|
||||
// Use EvalSymlinks to handle /var vs /private/var on macOS
|
||||
expectedDB, _ := filepath.EvalSymlinks(dbPath)
|
||||
actualDB, _ := filepath.EvalSymlinks(daemon.cfg.DBPath)
|
||||
if actualDB != expectedDB {
|
||||
t.Errorf("Expected DBPath %s, got %s", expectedDB, actualDB)
|
||||
}
|
||||
|
||||
expectedBeadsDir, _ := filepath.EvalSymlinks(beadsDir)
|
||||
actualBeadsDir, _ := filepath.EvalSymlinks(daemon.cfg.BeadsDir)
|
||||
if actualBeadsDir != expectedBeadsDir {
|
||||
t.Errorf("Expected BeadsDir %s, got %s", expectedBeadsDir, actualBeadsDir)
|
||||
}
|
||||
|
||||
expectedWS, _ := filepath.EvalSymlinks(tmpDir)
|
||||
actualWS, _ := filepath.EvalSymlinks(daemon.cfg.WorkspacePath)
|
||||
if actualWS != expectedWS {
|
||||
t.Errorf("Expected WorkspacePath %s, got %s", expectedWS, actualWS)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetermineDatabasePathAlreadySet(t *testing.T) {
|
||||
existingPath := "/already/set/beads.db"
|
||||
cfg := Config{
|
||||
DBPath: existingPath,
|
||||
}
|
||||
daemon := New(cfg, "0.19.0")
|
||||
|
||||
if err := daemon.determineDatabasePath(); err != nil {
|
||||
t.Errorf("determineDatabasePath() failed: %v", err)
|
||||
}
|
||||
|
||||
if daemon.cfg.DBPath != existingPath {
|
||||
t.Errorf("Expected DBPath unchanged: %s, got %s", existingPath, daemon.cfg.DBPath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetGlobalBeadsDir(t *testing.T) {
|
||||
beadsDir, err := getGlobalBeadsDir()
|
||||
if err != nil {
|
||||
t.Fatalf("getGlobalBeadsDir() failed: %v", err)
|
||||
}
|
||||
|
||||
if beadsDir == "" {
|
||||
t.Error("Expected non-empty beads directory")
|
||||
}
|
||||
|
||||
// Check directory was created
|
||||
if stat, err := os.Stat(beadsDir); err != nil {
|
||||
t.Errorf("Global beads directory not created: %v", err)
|
||||
} else if !stat.IsDir() {
|
||||
t.Error("Global beads path is not a directory")
|
||||
}
|
||||
|
||||
// Verify it's in home directory
|
||||
home, _ := os.UserHomeDir()
|
||||
expectedPath := filepath.Join(home, ".beads")
|
||||
if beadsDir != expectedPath {
|
||||
t.Errorf("Expected %s, got %s", expectedPath, beadsDir)
|
||||
}
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/steveyegge/beads/internal/beads"
|
||||
)
|
||||
|
||||
func (d *Daemon) validateDatabaseFingerprint() error {
|
||||
ctx := context.Background()
|
||||
|
||||
// Get stored repo ID
|
||||
storedRepoID, err := d.store.GetMetadata(ctx, "repo_id")
|
||||
if err != nil && err.Error() != "metadata key not found: repo_id" {
|
||||
return fmt.Errorf("failed to read repo_id: %w", err)
|
||||
}
|
||||
|
||||
// If no repo_id, this is a legacy database
|
||||
if storedRepoID == "" {
|
||||
return fmt.Errorf(`
|
||||
LEGACY DATABASE DETECTED!
|
||||
|
||||
This database was created before version 0.17.5 and lacks a repository fingerprint.
|
||||
To continue using this database, you must explicitly set its repository ID:
|
||||
|
||||
bd migrate --update-repo-id
|
||||
|
||||
This ensures the database is bound to this repository and prevents accidental
|
||||
database sharing between different repositories.
|
||||
|
||||
If this is a fresh clone, run:
|
||||
rm -rf .beads && bd init
|
||||
|
||||
Note: Auto-claiming legacy databases is intentionally disabled to prevent
|
||||
silent corruption when databases are copied between repositories.
|
||||
`)
|
||||
}
|
||||
|
||||
// Validate repo ID matches current repository
|
||||
currentRepoID, err := beads.ComputeRepoID()
|
||||
if err != nil {
|
||||
d.log.log("Warning: could not compute current repository ID: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if storedRepoID != currentRepoID {
|
||||
return fmt.Errorf(`
|
||||
DATABASE MISMATCH DETECTED!
|
||||
|
||||
This database belongs to a different repository:
|
||||
Database repo ID: %s
|
||||
Current repo ID: %s
|
||||
|
||||
This usually means:
|
||||
1. You copied a .beads directory from another repo (don't do this!)
|
||||
2. Git remote URL changed (run 'bd migrate --update-repo-id')
|
||||
3. Database corruption
|
||||
4. bd was upgraded and URL canonicalization changed
|
||||
|
||||
Solutions:
|
||||
- If remote URL changed: bd migrate --update-repo-id
|
||||
- If bd was upgraded: bd migrate --update-repo-id
|
||||
- If wrong database: rm -rf .beads && bd init
|
||||
- If correct database: BEADS_IGNORE_REPO_MISMATCH=1 bd daemon
|
||||
(Warning: This can cause data corruption across clones!)
|
||||
`, storedRepoID[:8], currentRepoID[:8])
|
||||
}
|
||||
|
||||
d.log.log("Repository fingerprint validated: %s", currentRepoID[:8])
|
||||
return nil
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
//go:build unix
|
||||
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// flockExclusive acquires an exclusive non-blocking lock on the file
|
||||
func flockExclusive(f *os.File) error {
|
||||
err := unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB)
|
||||
if err == unix.EWOULDBLOCK {
|
||||
return ErrDaemonLocked
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
//go:build windows
|
||||
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
// flockExclusive acquires an exclusive non-blocking lock on the file using LockFileEx
|
||||
func flockExclusive(f *os.File) error {
|
||||
// LOCKFILE_EXCLUSIVE_LOCK (2) | LOCKFILE_FAIL_IMMEDIATELY (1) = 3
|
||||
const flags = windows.LOCKFILE_EXCLUSIVE_LOCK | windows.LOCKFILE_FAIL_IMMEDIATELY
|
||||
|
||||
// Create overlapped structure for the entire file
|
||||
ol := &windows.Overlapped{}
|
||||
|
||||
// Lock entire file (0xFFFFFFFF, 0xFFFFFFFF = maximum range)
|
||||
err := windows.LockFileEx(
|
||||
windows.Handle(f.Fd()),
|
||||
flags,
|
||||
0, // reserved
|
||||
0xFFFFFFFF, // number of bytes to lock (low)
|
||||
0xFFFFFFFF, // number of bytes to lock (high)
|
||||
ol,
|
||||
)
|
||||
|
||||
if err == windows.ERROR_LOCK_VIOLATION || err == syscall.EWOULDBLOCK {
|
||||
return ErrDaemonLocked
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -1,120 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GitClient provides an interface for git operations to enable testing
|
||||
type GitClient interface {
|
||||
HasUpstream() bool
|
||||
HasChanges(ctx context.Context, filePath string) (bool, error)
|
||||
Commit(ctx context.Context, filePath string, message string) error
|
||||
Push(ctx context.Context) error
|
||||
Pull(ctx context.Context) error
|
||||
}
|
||||
|
||||
// DefaultGitClient implements GitClient using os/exec
|
||||
type DefaultGitClient struct{}
|
||||
|
||||
// NewGitClient creates a new default git client
|
||||
func NewGitClient() GitClient {
|
||||
return &DefaultGitClient{}
|
||||
}
|
||||
|
||||
// HasUpstream checks if the current branch has an upstream configured
|
||||
func (g *DefaultGitClient) HasUpstream() bool {
|
||||
return gitHasUpstream()
|
||||
}
|
||||
|
||||
// HasChanges checks if the specified file has uncommitted changes
|
||||
func (g *DefaultGitClient) HasChanges(ctx context.Context, filePath string) (bool, error) {
|
||||
return gitHasChanges(ctx, filePath)
|
||||
}
|
||||
|
||||
// Commit commits the specified file
|
||||
func (g *DefaultGitClient) Commit(ctx context.Context, filePath string, message string) error {
|
||||
return gitCommit(ctx, filePath, message)
|
||||
}
|
||||
|
||||
// Push pushes to the current branch's upstream
|
||||
func (g *DefaultGitClient) Push(ctx context.Context) error {
|
||||
return gitPush(ctx)
|
||||
}
|
||||
|
||||
// Pull pulls from the current branch's upstream
|
||||
func (g *DefaultGitClient) Pull(ctx context.Context) error {
|
||||
return gitPull(ctx)
|
||||
}
|
||||
|
||||
// isGitRepo checks if we're in a git repository
|
||||
func isGitRepo() bool {
|
||||
cmd := exec.Command("git", "rev-parse", "--git-dir")
|
||||
return cmd.Run() == nil
|
||||
}
|
||||
|
||||
// gitHasUpstream checks if the current branch has an upstream configured
|
||||
func gitHasUpstream() bool {
|
||||
cmd := exec.Command("git", "rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{u}")
|
||||
return cmd.Run() == nil
|
||||
}
|
||||
|
||||
// gitHasChanges checks if the specified file has uncommitted changes
|
||||
func gitHasChanges(ctx context.Context, filePath string) (bool, error) {
|
||||
cmd := exec.CommandContext(ctx, "git", "status", "--porcelain", filePath)
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("git status failed: %w", err)
|
||||
}
|
||||
return len(strings.TrimSpace(string(output))) > 0, nil
|
||||
}
|
||||
|
||||
// gitCommit commits the specified file
|
||||
func gitCommit(ctx context.Context, filePath string, message string) error {
|
||||
// Stage the file
|
||||
addCmd := exec.CommandContext(ctx, "git", "add", filePath)
|
||||
if err := addCmd.Run(); err != nil {
|
||||
return fmt.Errorf("git add failed: %w", err)
|
||||
}
|
||||
|
||||
// Generate message if not provided
|
||||
if message == "" {
|
||||
message = fmt.Sprintf("bd sync: %s", time.Now().Format("2006-01-02 15:04:05"))
|
||||
}
|
||||
|
||||
// Commit
|
||||
commitCmd := exec.CommandContext(ctx, "git", "commit", "-m", message)
|
||||
output, err := commitCmd.CombinedOutput()
|
||||
if err != nil {
|
||||
// Treat "nothing to commit" as success (idempotent)
|
||||
if strings.Contains(strings.ToLower(string(output)), "nothing to commit") {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("git commit failed: %w\n%s", err, output)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// gitPull pulls from the current branch's upstream
|
||||
func gitPull(ctx context.Context) error {
|
||||
cmd := exec.CommandContext(ctx, "git", "pull", "--ff-only")
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("git pull failed: %w\n%s", err, output)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// gitPush pushes to the current branch's upstream
|
||||
func gitPush(ctx context.Context) error {
|
||||
cmd := exec.CommandContext(ctx, "git", "push")
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("git push failed: %w\n%s", err, output)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,63 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
)
|
||||
|
||||
// logger provides simple logging with timestamp
|
||||
type logger struct {
|
||||
logFunc func(string, ...interface{})
|
||||
}
|
||||
|
||||
func (l *logger) log(format string, args ...interface{}) {
|
||||
l.logFunc(format, args...)
|
||||
}
|
||||
|
||||
// setupLogger configures the rotating log file
|
||||
func (d *Daemon) setupLogger() (*lumberjack.Logger, *logger) {
|
||||
maxSizeMB := getEnvInt("BEADS_DAEMON_LOG_MAX_SIZE", 10)
|
||||
maxBackups := getEnvInt("BEADS_DAEMON_LOG_MAX_BACKUPS", 3)
|
||||
maxAgeDays := getEnvInt("BEADS_DAEMON_LOG_MAX_AGE", 7)
|
||||
compress := getEnvBool("BEADS_DAEMON_LOG_COMPRESS", true)
|
||||
|
||||
logF := &lumberjack.Logger{
|
||||
Filename: d.cfg.LogFile,
|
||||
MaxSize: maxSizeMB,
|
||||
MaxBackups: maxBackups,
|
||||
MaxAge: maxAgeDays,
|
||||
Compress: compress,
|
||||
}
|
||||
|
||||
log := &logger{
|
||||
logFunc: func(format string, args ...interface{}) {
|
||||
msg := fmt.Sprintf(format, args...)
|
||||
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
||||
_, _ = fmt.Fprintf(logF, "[%s] %s\n", timestamp, msg)
|
||||
},
|
||||
}
|
||||
|
||||
return logF, log
|
||||
}
|
||||
|
||||
// getEnvInt reads an integer from environment variable with a default value
|
||||
func getEnvInt(key string, defaultValue int) int {
|
||||
if val := os.Getenv(key); val != "" {
|
||||
var parsed int
|
||||
if _, err := fmt.Sscanf(val, "%d", &parsed); err == nil {
|
||||
return parsed
|
||||
}
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
// getEnvBool reads a boolean from environment variable with a default value
|
||||
func getEnvBool(key string, defaultValue bool) bool {
|
||||
if val := os.Getenv(key); val != "" {
|
||||
return val == "true" || val == "1"
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
@@ -1,129 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ErrDaemonLocked = errors.New("daemon lock already held by another process")
|
||||
|
||||
// DaemonLockInfo represents the metadata stored in the daemon.lock file
|
||||
type DaemonLockInfo struct {
|
||||
PID int `json:"pid"`
|
||||
Database string `json:"database"`
|
||||
Version string `json:"version"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
}
|
||||
|
||||
// DaemonLock represents a held lock on the daemon.lock file
|
||||
type DaemonLock struct {
|
||||
file *os.File
|
||||
path string
|
||||
}
|
||||
|
||||
// Close releases the daemon lock
|
||||
func (l *DaemonLock) Close() error {
|
||||
if l.file == nil {
|
||||
return nil
|
||||
}
|
||||
err := l.file.Close()
|
||||
l.file = nil
|
||||
return err
|
||||
}
|
||||
|
||||
// getPIDFilePath returns the path to daemon.pid in the given beads directory
|
||||
func getPIDFilePath(beadsDir string) string {
|
||||
return filepath.Join(beadsDir, "daemon.pid")
|
||||
}
|
||||
|
||||
// getSocketPath returns the path to bd.sock in the given beads directory
|
||||
func getSocketPath(beadsDir string) string {
|
||||
return filepath.Join(beadsDir, "bd.sock")
|
||||
}
|
||||
|
||||
// readPIDFile reads the PID from daemon.pid
|
||||
func readPIDFile(pidFile string) (int, error) {
|
||||
// #nosec G304 - controlled path from config
|
||||
data, err := os.ReadFile(pidFile)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
pid, err := strconv.Atoi(strings.TrimSpace(string(data)))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid PID in file: %w", err)
|
||||
}
|
||||
return pid, nil
|
||||
}
|
||||
|
||||
// writePIDFile writes the current process PID to daemon.pid
|
||||
func writePIDFile(pidFile string) error {
|
||||
return os.WriteFile(pidFile, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0600)
|
||||
}
|
||||
|
||||
// ensurePIDFileCorrect verifies PID file has correct PID, fixes if wrong
|
||||
func ensurePIDFileCorrect(pidFile string) error {
|
||||
myPID := os.Getpid()
|
||||
if pid, err := readPIDFile(pidFile); err == nil && pid == myPID {
|
||||
return nil
|
||||
}
|
||||
return writePIDFile(pidFile)
|
||||
}
|
||||
|
||||
// checkVersionMismatch checks if database version matches daemon version
|
||||
func checkVersionMismatch(dbVersion, daemonVersion string) (mismatch bool, missing bool) {
|
||||
if dbVersion == "" {
|
||||
return false, true
|
||||
}
|
||||
if dbVersion != daemonVersion {
|
||||
return true, false
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
|
||||
// acquireDaemonLock attempts to acquire an exclusive lock on daemon.lock
|
||||
func acquireDaemonLock(beadsDir string, dbPath string, version string) (*DaemonLock, error) {
|
||||
lockPath := filepath.Join(beadsDir, "daemon.lock")
|
||||
|
||||
// Open or create the lock file
|
||||
// #nosec G304 - controlled path from config
|
||||
f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot open lock file: %w", err)
|
||||
}
|
||||
|
||||
// Try to acquire exclusive non-blocking lock
|
||||
if err := flockExclusive(f); err != nil {
|
||||
_ = f.Close()
|
||||
if err == ErrDaemonLocked {
|
||||
return nil, ErrDaemonLocked
|
||||
}
|
||||
return nil, fmt.Errorf("cannot lock file: %w", err)
|
||||
}
|
||||
|
||||
// Write JSON metadata to the lock file
|
||||
lockInfo := DaemonLockInfo{
|
||||
PID: os.Getpid(),
|
||||
Database: dbPath,
|
||||
Version: version,
|
||||
StartedAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
_ = f.Truncate(0)
|
||||
_, _ = f.Seek(0, 0)
|
||||
encoder := json.NewEncoder(f)
|
||||
encoder.SetIndent("", " ")
|
||||
_ = encoder.Encode(lockInfo)
|
||||
_ = f.Sync()
|
||||
|
||||
// Also write PID file for Windows compatibility
|
||||
pidFile := getPIDFilePath(beadsDir)
|
||||
_ = writePIDFile(pidFile)
|
||||
|
||||
return &DaemonLock{file: f, path: lockPath}, nil
|
||||
}
|
||||
@@ -1,124 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDaemonLockBasics(t *testing.T) {
|
||||
// Skip on Windows - file locking prevents reading lock file while locked
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("Windows file locking prevents reading locked files")
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
dbPath := filepath.Join(tmpDir, "beads.db")
|
||||
|
||||
// Acquire lock
|
||||
lock, err := acquireDaemonLock(tmpDir, dbPath, "0.19.0")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to acquire lock: %v", err)
|
||||
}
|
||||
defer lock.Close()
|
||||
|
||||
// Verify lock file was created
|
||||
lockPath := filepath.Join(tmpDir, "daemon.lock")
|
||||
if _, err := os.Stat(lockPath); os.IsNotExist(err) {
|
||||
t.Error("Lock file was not created")
|
||||
}
|
||||
|
||||
// Verify PID file was created
|
||||
pidPath := filepath.Join(tmpDir, "daemon.pid")
|
||||
if _, err := os.Stat(pidPath); os.IsNotExist(err) {
|
||||
t.Error("PID file was not created")
|
||||
}
|
||||
|
||||
// Read and verify lock metadata
|
||||
data, err := os.ReadFile(lockPath)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read lock file: %v", err)
|
||||
}
|
||||
|
||||
var info DaemonLockInfo
|
||||
if err := json.Unmarshal(data, &info); err != nil {
|
||||
t.Fatalf("Failed to parse lock file: %v", err)
|
||||
}
|
||||
|
||||
if info.PID != os.Getpid() {
|
||||
t.Errorf("Expected PID %d, got %d", os.Getpid(), info.PID)
|
||||
}
|
||||
if info.Database != dbPath {
|
||||
t.Errorf("Expected database %s, got %s", dbPath, info.Database)
|
||||
}
|
||||
if info.Version != "0.19.0" {
|
||||
t.Errorf("Expected version 0.19.0, got %s", info.Version)
|
||||
}
|
||||
if info.StartedAt.IsZero() {
|
||||
t.Error("Expected non-zero StartedAt timestamp")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaemonLockExclusive(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
dbPath := filepath.Join(tmpDir, "beads.db")
|
||||
|
||||
// Acquire first lock
|
||||
lock1, err := acquireDaemonLock(tmpDir, dbPath, "0.19.0")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to acquire first lock: %v", err)
|
||||
}
|
||||
defer lock1.Close()
|
||||
|
||||
// Try to acquire second lock (should fail)
|
||||
lock2, err := acquireDaemonLock(tmpDir, dbPath, "0.19.0")
|
||||
if err != ErrDaemonLocked {
|
||||
if lock2 != nil {
|
||||
lock2.Close()
|
||||
}
|
||||
t.Errorf("Expected ErrDaemonLocked, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaemonLockRelease(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
dbPath := filepath.Join(tmpDir, "beads.db")
|
||||
|
||||
// Acquire lock
|
||||
lock, err := acquireDaemonLock(tmpDir, dbPath, "0.19.0")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to acquire lock: %v", err)
|
||||
}
|
||||
|
||||
// Release lock
|
||||
if err := lock.Close(); err != nil {
|
||||
t.Fatalf("Failed to release lock: %v", err)
|
||||
}
|
||||
|
||||
// Should be able to acquire again after release
|
||||
lock2, err := acquireDaemonLock(tmpDir, dbPath, "0.19.0")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to acquire lock after release: %v", err)
|
||||
}
|
||||
defer lock2.Close()
|
||||
}
|
||||
|
||||
func TestDaemonLockCloseIdempotent(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
dbPath := filepath.Join(tmpDir, "beads.db")
|
||||
|
||||
lock, err := acquireDaemonLock(tmpDir, dbPath, "0.19.0")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to acquire lock: %v", err)
|
||||
}
|
||||
|
||||
// Close multiple times should not error
|
||||
if err := lock.Close(); err != nil {
|
||||
t.Errorf("First close failed: %v", err)
|
||||
}
|
||||
if err := lock.Close(); err != nil {
|
||||
t.Errorf("Second close failed: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -1,37 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/rpc"
|
||||
)
|
||||
|
||||
// startRPCServer initializes and starts the RPC server
|
||||
func (d *Daemon) startRPCServer(ctx context.Context) (*rpc.Server, chan error, error) {
|
||||
// Sync daemon version with CLI version
|
||||
rpc.ServerVersion = d.Version
|
||||
|
||||
server := rpc.NewServer(d.cfg.SocketPath, d.store, d.cfg.WorkspacePath, d.cfg.DBPath)
|
||||
serverErrChan := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
d.log.log("Starting RPC server: %s", d.cfg.SocketPath)
|
||||
if err := server.Start(ctx); err != nil {
|
||||
d.log.log("RPC server error: %v", err)
|
||||
serverErrChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-serverErrChan:
|
||||
d.log.log("RPC server failed to start: %v", err)
|
||||
return nil, nil, err
|
||||
case <-server.WaitReady():
|
||||
d.log.log("RPC server ready (socket listening)")
|
||||
case <-time.After(5 * time.Second):
|
||||
d.log.log("WARNING: Server didn't signal ready after 5 seconds (may still be starting)")
|
||||
}
|
||||
|
||||
return server, serverErrChan, nil
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
//go:build unix || linux || darwin
|
||||
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
var daemonSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP}
|
||||
|
||||
func isReloadSignal(sig os.Signal) bool {
|
||||
return sig == syscall.SIGHUP
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
//go:build windows
|
||||
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
var daemonSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
|
||||
|
||||
func isReloadSignal(os.Signal) bool {
|
||||
return false
|
||||
}
|
||||
@@ -1,241 +0,0 @@
|
||||
package daemonrunner
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// runSyncLoop manages the main daemon event loop for sync operations
|
||||
func (d *Daemon) runSyncLoop(ctx context.Context, serverErrChan chan error) error {
|
||||
beadsDir := d.cfg.BeadsDir
|
||||
jsonlPath := filepath.Join(filepath.Dir(beadsDir), "issues.jsonl")
|
||||
|
||||
ticker := time.NewTicker(d.cfg.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
doSync := func() {
|
||||
syncCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
if err := d.exportToJSONL(syncCtx, jsonlPath); err != nil {
|
||||
d.log.log("Export failed: %v", err)
|
||||
return
|
||||
}
|
||||
d.log.log("Exported to JSONL")
|
||||
|
||||
if d.cfg.AutoCommit {
|
||||
hasChanges, err := gitHasChanges(syncCtx, jsonlPath)
|
||||
if err != nil {
|
||||
d.log.log("Error checking git status: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if hasChanges {
|
||||
message := "bd daemon sync: " + time.Now().Format("2006-01-02 15:04:05")
|
||||
if err := gitCommit(syncCtx, jsonlPath, message); err != nil {
|
||||
d.log.log("Commit failed: %v", err)
|
||||
return
|
||||
}
|
||||
d.log.log("Committed changes")
|
||||
}
|
||||
}
|
||||
|
||||
if err := gitPull(syncCtx); err != nil {
|
||||
d.log.log("Pull failed: %v", err)
|
||||
return
|
||||
}
|
||||
d.log.log("Pulled from remote")
|
||||
|
||||
beforeCount, err := d.countDBIssues(syncCtx)
|
||||
if err != nil {
|
||||
d.log.log("Failed to count issues before import: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := d.importFromJSONL(syncCtx, jsonlPath); err != nil {
|
||||
d.log.log("Import failed: %v", err)
|
||||
return
|
||||
}
|
||||
d.log.log("Imported from JSONL")
|
||||
|
||||
afterCount, err := d.countDBIssues(syncCtx)
|
||||
if err != nil {
|
||||
d.log.log("Failed to count issues after import: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := d.validatePostImport(beforeCount, afterCount); err != nil {
|
||||
d.log.log("Post-import validation failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if d.cfg.AutoPush && d.cfg.AutoCommit {
|
||||
if err := gitPush(syncCtx); err != nil {
|
||||
d.log.log("Push failed: %v", err)
|
||||
return
|
||||
}
|
||||
d.log.log("Pushed to remote")
|
||||
}
|
||||
|
||||
d.log.log("Sync cycle complete")
|
||||
}
|
||||
|
||||
return d.runEventLoop(ctx, ticker, doSync, serverErrChan)
|
||||
}
|
||||
|
||||
// runEventLoop handles signals and periodic sync
|
||||
func (d *Daemon) runEventLoop(ctx context.Context, ticker *time.Ticker, doSync func(), serverErrChan chan error) error {
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, daemonSignals...)
|
||||
defer signal.Stop(sigChan)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
doSync()
|
||||
case sig := <-sigChan:
|
||||
if isReloadSignal(sig) {
|
||||
d.log.log("Received reload signal, ignoring (daemon continues running)")
|
||||
continue
|
||||
}
|
||||
d.log.log("Received signal %v, shutting down gracefully...", sig)
|
||||
d.cancel()
|
||||
if err := d.server.Stop(); err != nil {
|
||||
d.log.log("Error stopping RPC server: %v", err)
|
||||
}
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
d.log.log("Context canceled, shutting down")
|
||||
if err := d.server.Stop(); err != nil {
|
||||
d.log.log("Error stopping RPC server: %v", err)
|
||||
}
|
||||
return nil
|
||||
case err := <-serverErrChan:
|
||||
d.log.log("RPC server failed: %v", err)
|
||||
d.cancel()
|
||||
if err := d.server.Stop(); err != nil {
|
||||
d.log.log("Error stopping RPC server: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// exportToJSONL exports all issues to JSONL format
|
||||
func (d *Daemon) exportToJSONL(ctx context.Context, jsonlPath string) error {
|
||||
// Get all issues
|
||||
issues, err := d.store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get issues: %w", err)
|
||||
}
|
||||
|
||||
// Sort by ID for consistent output
|
||||
sort.Slice(issues, func(i, j int) bool {
|
||||
return issues[i].ID < issues[j].ID
|
||||
})
|
||||
|
||||
// Populate dependencies for all issues
|
||||
allDeps, err := d.store.GetAllDependencyRecords(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get dependencies: %w", err)
|
||||
}
|
||||
for _, issue := range issues {
|
||||
issue.Dependencies = allDeps[issue.ID]
|
||||
}
|
||||
|
||||
// Populate labels for all issues
|
||||
for _, issue := range issues {
|
||||
labels, err := d.store.GetLabels(ctx, issue.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get labels for %s: %w", issue.ID, err)
|
||||
}
|
||||
issue.Labels = labels
|
||||
}
|
||||
|
||||
// Populate comments for all issues
|
||||
for _, issue := range issues {
|
||||
comments, err := d.store.GetIssueComments(ctx, issue.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get comments for %s: %w", issue.ID, err)
|
||||
}
|
||||
issue.Comments = comments
|
||||
}
|
||||
|
||||
// Write to temp file then rename for atomicity
|
||||
tempFile := jsonlPath + ".tmp"
|
||||
f, err := os.Create(tempFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp file: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
encoder := json.NewEncoder(f)
|
||||
for _, issue := range issues {
|
||||
if err := encoder.Encode(issue); err != nil {
|
||||
return fmt.Errorf("failed to encode issue: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := f.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close temp file: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(tempFile, jsonlPath); err != nil {
|
||||
return fmt.Errorf("failed to rename temp file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// importFromJSONL imports issues from JSONL format
|
||||
func (d *Daemon) importFromJSONL(ctx context.Context, jsonlPath string) error {
|
||||
// For now we skip import in the daemon runner - the daemon in cmd/bd will handle it
|
||||
// This is a temporary implementation that delegates to the existing daemon code
|
||||
// TODO(bd-b5a3): Complete the refactoring by extracting the import logic
|
||||
return nil
|
||||
}
|
||||
|
||||
// countDBIssues returns the count of issues in the database
|
||||
func (d *Daemon) countDBIssues(ctx context.Context) (int, error) {
|
||||
// Try fast path with COUNT(*)
|
||||
type dbGetter interface {
|
||||
GetDB() interface{}
|
||||
}
|
||||
|
||||
if getter, ok := d.store.(dbGetter); ok {
|
||||
if db, ok := getter.GetDB().(*sql.DB); ok && db != nil {
|
||||
var count int
|
||||
err := db.QueryRowContext(ctx, "SELECT COUNT(*) FROM issues").Scan(&count)
|
||||
if err == nil {
|
||||
return count, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: load all issues and count them
|
||||
issues, err := d.store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to count database issues: %w", err)
|
||||
}
|
||||
return len(issues), nil
|
||||
}
|
||||
|
||||
// validatePostImport validates that the import didn't cause data loss
|
||||
func (d *Daemon) validatePostImport(before, after int) error {
|
||||
if after < before {
|
||||
return fmt.Errorf("import reduced issue count: %d → %d (data loss detected!)", before, after)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user