diff --git a/internal/daemon/registry.go b/internal/daemon/registry.go index 03e642fb..fc2d6690 100644 --- a/internal/daemon/registry.go +++ b/internal/daemon/registry.go @@ -7,6 +7,8 @@ import ( "path/filepath" "sync" "time" + + "github.com/steveyegge/beads/internal/lockfile" ) // RegistryEntry represents a daemon entry in the registry @@ -21,8 +23,9 @@ type RegistryEntry struct { // Registry manages the global daemon registry file type Registry struct { - path string - mu sync.Mutex + path string + lockPath string + mu sync.Mutex // in-process mutex (cross-process uses file lock) } // NewRegistry creates a new registry instance @@ -39,14 +42,36 @@ func NewRegistry() (*Registry, error) { } registryPath := filepath.Join(beadsDir, "registry.json") - return &Registry{path: registryPath}, nil + lockPath := filepath.Join(beadsDir, "registry.lock") + return &Registry{path: registryPath, lockPath: lockPath}, nil } -// readEntries reads all entries from the registry file -func (r *Registry) readEntries() ([]RegistryEntry, error) { +// withFileLock executes fn while holding an exclusive file lock on the registry. +// This provides cross-process synchronization for read-modify-write operations. +func (r *Registry) withFileLock(fn func() error) error { r.mu.Lock() defer r.mu.Unlock() + // Open or create lock file + // nolint:gosec // G304: controlled path from config + lockFile, err := os.OpenFile(r.lockPath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("failed to open lock file: %w", err) + } + defer func() { _ = lockFile.Close() }() + + // Acquire exclusive lock (blocking) + if err := lockfile.FlockExclusiveBlocking(lockFile); err != nil { + return fmt.Errorf("failed to acquire lock: %w", err) + } + defer func() { _ = lockfile.FlockUnlock(lockFile) }() + + return fn() +} + +// readEntriesLocked reads all entries from the registry file. +// Caller must hold the file lock. +func (r *Registry) readEntriesLocked() ([]RegistryEntry, error) { data, err := os.ReadFile(r.path) if err != nil { if os.IsNotExist(err) { @@ -63,11 +88,9 @@ func (r *Registry) readEntries() ([]RegistryEntry, error) { return entries, nil } -// writeEntries writes all entries to the registry file -func (r *Registry) writeEntries(entries []RegistryEntry) error { - r.mu.Lock() - defer r.mu.Unlock() - +// writeEntriesLocked writes all entries to the registry file atomically. +// Caller must hold the file lock. +func (r *Registry) writeEntriesLocked(entries []RegistryEntry) error { // Ensure we always write an array, never null if entries == nil { entries = []RegistryEntry{} @@ -78,98 +101,153 @@ func (r *Registry) writeEntries(entries []RegistryEntry) error { return fmt.Errorf("failed to marshal registry: %w", err) } - // nolint:gosec // G306: Registry file needs to be readable for daemon discovery - if err := os.WriteFile(r.path, data, 0644); err != nil { - return fmt.Errorf("failed to write registry: %w", err) + // Write atomically: write to temp file, then rename + dir := filepath.Dir(r.path) + tmpFile, err := os.CreateTemp(dir, "registry-*.json.tmp") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + tmpPath := tmpFile.Name() + + // Write data to temp file + if _, err := tmpFile.Write(data); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + return fmt.Errorf("failed to write temp file: %w", err) + } + + // Sync to disk before rename + if err := tmpFile.Sync(); err != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + return fmt.Errorf("failed to sync temp file: %w", err) + } + + if err := tmpFile.Close(); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("failed to close temp file: %w", err) + } + + // Atomic rename + if err := os.Rename(tmpPath, r.path); err != nil { + _ = os.Remove(tmpPath) + return fmt.Errorf("failed to rename temp file: %w", err) } return nil } +// readEntries reads all entries from the registry file (with locking). +func (r *Registry) readEntries() ([]RegistryEntry, error) { + var entries []RegistryEntry + err := r.withFileLock(func() error { + var readErr error + entries, readErr = r.readEntriesLocked() + return readErr + }) + return entries, err +} + +// writeEntries writes all entries to the registry file (with locking). +func (r *Registry) writeEntries(entries []RegistryEntry) error { + return r.withFileLock(func() error { + return r.writeEntriesLocked(entries) + }) +} + // Register adds a daemon to the registry func (r *Registry) Register(entry RegistryEntry) error { - entries, err := r.readEntries() - if err != nil { - return err - } - - // Remove any existing entry for this workspace or PID - filtered := []RegistryEntry{} - for _, e := range entries { - if e.WorkspacePath != entry.WorkspacePath && e.PID != entry.PID { - filtered = append(filtered, e) + return r.withFileLock(func() error { + entries, err := r.readEntriesLocked() + if err != nil { + return err } - } - // Add new entry - filtered = append(filtered, entry) + // Remove any existing entry for this workspace or PID + filtered := []RegistryEntry{} + for _, e := range entries { + if e.WorkspacePath != entry.WorkspacePath && e.PID != entry.PID { + filtered = append(filtered, e) + } + } - return r.writeEntries(filtered) + // Add new entry + filtered = append(filtered, entry) + + return r.writeEntriesLocked(filtered) + }) } // Unregister removes a daemon from the registry func (r *Registry) Unregister(workspacePath string, pid int) error { - entries, err := r.readEntries() - if err != nil { - return err - } - - // Filter out entries matching workspace or PID - filtered := []RegistryEntry{} - for _, e := range entries { - if e.WorkspacePath != workspacePath && e.PID != pid { - filtered = append(filtered, e) + return r.withFileLock(func() error { + entries, err := r.readEntriesLocked() + if err != nil { + return err } - } - return r.writeEntries(filtered) + // Filter out entries matching workspace or PID + filtered := []RegistryEntry{} + for _, e := range entries { + if e.WorkspacePath != workspacePath && e.PID != pid { + filtered = append(filtered, e) + } + } + + return r.writeEntriesLocked(filtered) + }) } // List returns all daemons from the registry, automatically cleaning up stale entries func (r *Registry) List() ([]DaemonInfo, error) { - entries, err := r.readEntries() - if err != nil { - return nil, err - } - var daemons []DaemonInfo - var aliveEntries []RegistryEntry - for _, entry := range entries { - // Check if process is still alive - if !isProcessAlive(entry.PID) { - // Stale entry - skip and don't add to alive list - continue + err := r.withFileLock(func() error { + entries, err := r.readEntriesLocked() + if err != nil { + return err } - // Process is alive, add to both lists - aliveEntries = append(aliveEntries, entry) + var aliveEntries []RegistryEntry - // Try to connect and get current status - daemon := discoverDaemon(entry.SocketPath) - - // If connection failed but process is alive, still include basic info - if !daemon.Alive { - daemon.Alive = true // Process exists, socket just might not be ready - daemon.WorkspacePath = entry.WorkspacePath - daemon.DatabasePath = entry.DatabasePath - daemon.SocketPath = entry.SocketPath - daemon.PID = entry.PID - daemon.Version = entry.Version + for _, entry := range entries { + // Check if process is still alive + if !isProcessAlive(entry.PID) { + // Stale entry - skip and don't add to alive list + continue + } + + // Process is alive, add to both lists + aliveEntries = append(aliveEntries, entry) + + // Try to connect and get current status + daemon := discoverDaemon(entry.SocketPath) + + // If connection failed but process is alive, still include basic info + if !daemon.Alive { + daemon.Alive = true // Process exists, socket just might not be ready + daemon.WorkspacePath = entry.WorkspacePath + daemon.DatabasePath = entry.DatabasePath + daemon.SocketPath = entry.SocketPath + daemon.PID = entry.PID + daemon.Version = entry.Version + } + + daemons = append(daemons, daemon) } - daemons = append(daemons, daemon) - } - - // Clean up stale entries from registry - if len(aliveEntries) != len(entries) { - if err := r.writeEntries(aliveEntries); err != nil { - // Log warning but don't fail - we still have the daemon list - fmt.Fprintf(os.Stderr, "Warning: failed to cleanup stale registry entries: %v\n", err) + // Clean up stale entries from registry + if len(aliveEntries) != len(entries) { + if err := r.writeEntriesLocked(aliveEntries); err != nil { + // Log warning but don't fail - we still have the daemon list + fmt.Fprintf(os.Stderr, "Warning: failed to cleanup stale registry entries: %v\n", err) + } } - } - return daemons, nil + return nil + }) + + return daemons, err } // Clear removes all entries from the registry (for testing) diff --git a/internal/lockfile/lock_unix.go b/internal/lockfile/lock_unix.go index 71be24e4..6b5d65fa 100644 --- a/internal/lockfile/lock_unix.go +++ b/internal/lockfile/lock_unix.go @@ -19,3 +19,14 @@ func flockExclusive(f *os.File) error { } return err } + +// FlockExclusiveBlocking acquires an exclusive blocking lock on the file. +// This will wait until the lock is available. +func FlockExclusiveBlocking(f *os.File) error { + return unix.Flock(int(f.Fd()), unix.LOCK_EX) +} + +// FlockUnlock releases a lock on the file. +func FlockUnlock(f *os.File) error { + return unix.Flock(int(f.Fd()), unix.LOCK_UN) +} diff --git a/internal/lockfile/lock_wasm.go b/internal/lockfile/lock_wasm.go index ed9148f3..cd072adf 100644 --- a/internal/lockfile/lock_wasm.go +++ b/internal/lockfile/lock_wasm.go @@ -4,7 +4,6 @@ package lockfile import ( "errors" - "fmt" "os" ) @@ -13,5 +12,17 @@ var errDaemonLocked = errors.New("daemon lock already held by another process") func flockExclusive(f *os.File) error { // WASM doesn't support file locking // In a WASM environment, we're typically single-process anyway - return fmt.Errorf("file locking not supported in WASM") + return nil // No-op in WASM +} + +// FlockExclusiveBlocking acquires an exclusive blocking lock on the file. +// In WASM, this is a no-op since we're single-process. +func FlockExclusiveBlocking(f *os.File) error { + return nil +} + +// FlockUnlock releases a lock on the file. +// In WASM, this is a no-op. +func FlockUnlock(f *os.File) error { + return nil } diff --git a/internal/lockfile/lock_windows.go b/internal/lockfile/lock_windows.go index be9de5fa..99242124 100644 --- a/internal/lockfile/lock_windows.go +++ b/internal/lockfile/lock_windows.go @@ -36,3 +36,34 @@ func flockExclusive(f *os.File) error { return err } + +// FlockExclusiveBlocking acquires an exclusive blocking lock on the file. +// This will wait until the lock is available. +func FlockExclusiveBlocking(f *os.File) error { + // LOCKFILE_EXCLUSIVE_LOCK only (no FAIL_IMMEDIATELY = blocking) + const flags = windows.LOCKFILE_EXCLUSIVE_LOCK + + ol := &windows.Overlapped{} + + return windows.LockFileEx( + windows.Handle(f.Fd()), + flags, + 0, + 0xFFFFFFFF, + 0xFFFFFFFF, + ol, + ) +} + +// FlockUnlock releases a lock on the file. +func FlockUnlock(f *os.File) error { + ol := &windows.Overlapped{} + + return windows.UnlockFileEx( + windows.Handle(f.Fd()), + 0, + 0xFFFFFFFF, + 0xFFFFFFFF, + ol, + ) +}