fix(daemon): add cross-process locking to registry (bd-5bj)
The global daemon registry (~/.beads/registry.json) could be corrupted when multiple daemons from different workspaces wrote simultaneously. Changes: - Add file locking (flock) for cross-process synchronization - Use atomic writes (temp file + rename) to prevent partial writes - Keep entire read-modify-write cycle under single lock - Add FlockExclusiveBlocking and FlockUnlock to lockfile package This prevents race conditions that caused JSON corruption like `]]`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,8 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/steveyegge/beads/internal/lockfile"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RegistryEntry represents a daemon entry in the registry
|
// RegistryEntry represents a daemon entry in the registry
|
||||||
@@ -21,8 +23,9 @@ type RegistryEntry struct {
|
|||||||
|
|
||||||
// Registry manages the global daemon registry file
|
// Registry manages the global daemon registry file
|
||||||
type Registry struct {
|
type Registry struct {
|
||||||
path string
|
path string
|
||||||
mu sync.Mutex
|
lockPath string
|
||||||
|
mu sync.Mutex // in-process mutex (cross-process uses file lock)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRegistry creates a new registry instance
|
// NewRegistry creates a new registry instance
|
||||||
@@ -39,14 +42,36 @@ func NewRegistry() (*Registry, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
registryPath := filepath.Join(beadsDir, "registry.json")
|
registryPath := filepath.Join(beadsDir, "registry.json")
|
||||||
return &Registry{path: registryPath}, nil
|
lockPath := filepath.Join(beadsDir, "registry.lock")
|
||||||
|
return &Registry{path: registryPath, lockPath: lockPath}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// readEntries reads all entries from the registry file
|
// withFileLock executes fn while holding an exclusive file lock on the registry.
|
||||||
func (r *Registry) readEntries() ([]RegistryEntry, error) {
|
// This provides cross-process synchronization for read-modify-write operations.
|
||||||
|
func (r *Registry) withFileLock(fn func() error) error {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
defer r.mu.Unlock()
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
// Open or create lock file
|
||||||
|
// nolint:gosec // G304: controlled path from config
|
||||||
|
lockFile, err := os.OpenFile(r.lockPath, os.O_CREATE|os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open lock file: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = lockFile.Close() }()
|
||||||
|
|
||||||
|
// Acquire exclusive lock (blocking)
|
||||||
|
if err := lockfile.FlockExclusiveBlocking(lockFile); err != nil {
|
||||||
|
return fmt.Errorf("failed to acquire lock: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = lockfile.FlockUnlock(lockFile) }()
|
||||||
|
|
||||||
|
return fn()
|
||||||
|
}
|
||||||
|
|
||||||
|
// readEntriesLocked reads all entries from the registry file.
|
||||||
|
// Caller must hold the file lock.
|
||||||
|
func (r *Registry) readEntriesLocked() ([]RegistryEntry, error) {
|
||||||
data, err := os.ReadFile(r.path)
|
data, err := os.ReadFile(r.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
@@ -63,11 +88,9 @@ func (r *Registry) readEntries() ([]RegistryEntry, error) {
|
|||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeEntries writes all entries to the registry file
|
// writeEntriesLocked writes all entries to the registry file atomically.
|
||||||
func (r *Registry) writeEntries(entries []RegistryEntry) error {
|
// Caller must hold the file lock.
|
||||||
r.mu.Lock()
|
func (r *Registry) writeEntriesLocked(entries []RegistryEntry) error {
|
||||||
defer r.mu.Unlock()
|
|
||||||
|
|
||||||
// Ensure we always write an array, never null
|
// Ensure we always write an array, never null
|
||||||
if entries == nil {
|
if entries == nil {
|
||||||
entries = []RegistryEntry{}
|
entries = []RegistryEntry{}
|
||||||
@@ -78,98 +101,153 @@ func (r *Registry) writeEntries(entries []RegistryEntry) error {
|
|||||||
return fmt.Errorf("failed to marshal registry: %w", err)
|
return fmt.Errorf("failed to marshal registry: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint:gosec // G306: Registry file needs to be readable for daemon discovery
|
// Write atomically: write to temp file, then rename
|
||||||
if err := os.WriteFile(r.path, data, 0644); err != nil {
|
dir := filepath.Dir(r.path)
|
||||||
return fmt.Errorf("failed to write registry: %w", err)
|
tmpFile, err := os.CreateTemp(dir, "registry-*.json.tmp")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create temp file: %w", err)
|
||||||
|
}
|
||||||
|
tmpPath := tmpFile.Name()
|
||||||
|
|
||||||
|
// Write data to temp file
|
||||||
|
if _, err := tmpFile.Write(data); err != nil {
|
||||||
|
_ = tmpFile.Close()
|
||||||
|
_ = os.Remove(tmpPath)
|
||||||
|
return fmt.Errorf("failed to write temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync to disk before rename
|
||||||
|
if err := tmpFile.Sync(); err != nil {
|
||||||
|
_ = tmpFile.Close()
|
||||||
|
_ = os.Remove(tmpPath)
|
||||||
|
return fmt.Errorf("failed to sync temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tmpFile.Close(); err != nil {
|
||||||
|
_ = os.Remove(tmpPath)
|
||||||
|
return fmt.Errorf("failed to close temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomic rename
|
||||||
|
if err := os.Rename(tmpPath, r.path); err != nil {
|
||||||
|
_ = os.Remove(tmpPath)
|
||||||
|
return fmt.Errorf("failed to rename temp file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readEntries reads all entries from the registry file (with locking).
|
||||||
|
func (r *Registry) readEntries() ([]RegistryEntry, error) {
|
||||||
|
var entries []RegistryEntry
|
||||||
|
err := r.withFileLock(func() error {
|
||||||
|
var readErr error
|
||||||
|
entries, readErr = r.readEntriesLocked()
|
||||||
|
return readErr
|
||||||
|
})
|
||||||
|
return entries, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeEntries writes all entries to the registry file (with locking).
|
||||||
|
func (r *Registry) writeEntries(entries []RegistryEntry) error {
|
||||||
|
return r.withFileLock(func() error {
|
||||||
|
return r.writeEntriesLocked(entries)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Register adds a daemon to the registry
|
// Register adds a daemon to the registry
|
||||||
func (r *Registry) Register(entry RegistryEntry) error {
|
func (r *Registry) Register(entry RegistryEntry) error {
|
||||||
entries, err := r.readEntries()
|
return r.withFileLock(func() error {
|
||||||
if err != nil {
|
entries, err := r.readEntriesLocked()
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
|
||||||
// Remove any existing entry for this workspace or PID
|
|
||||||
filtered := []RegistryEntry{}
|
|
||||||
for _, e := range entries {
|
|
||||||
if e.WorkspacePath != entry.WorkspacePath && e.PID != entry.PID {
|
|
||||||
filtered = append(filtered, e)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Add new entry
|
// Remove any existing entry for this workspace or PID
|
||||||
filtered = append(filtered, entry)
|
filtered := []RegistryEntry{}
|
||||||
|
for _, e := range entries {
|
||||||
|
if e.WorkspacePath != entry.WorkspacePath && e.PID != entry.PID {
|
||||||
|
filtered = append(filtered, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return r.writeEntries(filtered)
|
// Add new entry
|
||||||
|
filtered = append(filtered, entry)
|
||||||
|
|
||||||
|
return r.writeEntriesLocked(filtered)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unregister removes a daemon from the registry
|
// Unregister removes a daemon from the registry
|
||||||
func (r *Registry) Unregister(workspacePath string, pid int) error {
|
func (r *Registry) Unregister(workspacePath string, pid int) error {
|
||||||
entries, err := r.readEntries()
|
return r.withFileLock(func() error {
|
||||||
if err != nil {
|
entries, err := r.readEntriesLocked()
|
||||||
return err
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
|
||||||
// Filter out entries matching workspace or PID
|
|
||||||
filtered := []RegistryEntry{}
|
|
||||||
for _, e := range entries {
|
|
||||||
if e.WorkspacePath != workspacePath && e.PID != pid {
|
|
||||||
filtered = append(filtered, e)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return r.writeEntries(filtered)
|
// Filter out entries matching workspace or PID
|
||||||
|
filtered := []RegistryEntry{}
|
||||||
|
for _, e := range entries {
|
||||||
|
if e.WorkspacePath != workspacePath && e.PID != pid {
|
||||||
|
filtered = append(filtered, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.writeEntriesLocked(filtered)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns all daemons from the registry, automatically cleaning up stale entries
|
// List returns all daemons from the registry, automatically cleaning up stale entries
|
||||||
func (r *Registry) List() ([]DaemonInfo, error) {
|
func (r *Registry) List() ([]DaemonInfo, error) {
|
||||||
entries, err := r.readEntries()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var daemons []DaemonInfo
|
var daemons []DaemonInfo
|
||||||
var aliveEntries []RegistryEntry
|
|
||||||
|
|
||||||
for _, entry := range entries {
|
err := r.withFileLock(func() error {
|
||||||
// Check if process is still alive
|
entries, err := r.readEntriesLocked()
|
||||||
if !isProcessAlive(entry.PID) {
|
if err != nil {
|
||||||
// Stale entry - skip and don't add to alive list
|
return err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process is alive, add to both lists
|
var aliveEntries []RegistryEntry
|
||||||
aliveEntries = append(aliveEntries, entry)
|
|
||||||
|
|
||||||
// Try to connect and get current status
|
for _, entry := range entries {
|
||||||
daemon := discoverDaemon(entry.SocketPath)
|
// Check if process is still alive
|
||||||
|
if !isProcessAlive(entry.PID) {
|
||||||
// If connection failed but process is alive, still include basic info
|
// Stale entry - skip and don't add to alive list
|
||||||
if !daemon.Alive {
|
continue
|
||||||
daemon.Alive = true // Process exists, socket just might not be ready
|
}
|
||||||
daemon.WorkspacePath = entry.WorkspacePath
|
|
||||||
daemon.DatabasePath = entry.DatabasePath
|
// Process is alive, add to both lists
|
||||||
daemon.SocketPath = entry.SocketPath
|
aliveEntries = append(aliveEntries, entry)
|
||||||
daemon.PID = entry.PID
|
|
||||||
daemon.Version = entry.Version
|
// Try to connect and get current status
|
||||||
|
daemon := discoverDaemon(entry.SocketPath)
|
||||||
|
|
||||||
|
// If connection failed but process is alive, still include basic info
|
||||||
|
if !daemon.Alive {
|
||||||
|
daemon.Alive = true // Process exists, socket just might not be ready
|
||||||
|
daemon.WorkspacePath = entry.WorkspacePath
|
||||||
|
daemon.DatabasePath = entry.DatabasePath
|
||||||
|
daemon.SocketPath = entry.SocketPath
|
||||||
|
daemon.PID = entry.PID
|
||||||
|
daemon.Version = entry.Version
|
||||||
|
}
|
||||||
|
|
||||||
|
daemons = append(daemons, daemon)
|
||||||
}
|
}
|
||||||
|
|
||||||
daemons = append(daemons, daemon)
|
// Clean up stale entries from registry
|
||||||
}
|
if len(aliveEntries) != len(entries) {
|
||||||
|
if err := r.writeEntriesLocked(aliveEntries); err != nil {
|
||||||
// Clean up stale entries from registry
|
// Log warning but don't fail - we still have the daemon list
|
||||||
if len(aliveEntries) != len(entries) {
|
fmt.Fprintf(os.Stderr, "Warning: failed to cleanup stale registry entries: %v\n", err)
|
||||||
if err := r.writeEntries(aliveEntries); err != nil {
|
}
|
||||||
// Log warning but don't fail - we still have the daemon list
|
|
||||||
fmt.Fprintf(os.Stderr, "Warning: failed to cleanup stale registry entries: %v\n", err)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return daemons, nil
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return daemons, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear removes all entries from the registry (for testing)
|
// Clear removes all entries from the registry (for testing)
|
||||||
|
|||||||
@@ -19,3 +19,14 @@ func flockExclusive(f *os.File) error {
|
|||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FlockExclusiveBlocking acquires an exclusive blocking lock on the file.
|
||||||
|
// This will wait until the lock is available.
|
||||||
|
func FlockExclusiveBlocking(f *os.File) error {
|
||||||
|
return unix.Flock(int(f.Fd()), unix.LOCK_EX)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlockUnlock releases a lock on the file.
|
||||||
|
func FlockUnlock(f *os.File) error {
|
||||||
|
return unix.Flock(int(f.Fd()), unix.LOCK_UN)
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ package lockfile
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -13,5 +12,17 @@ var errDaemonLocked = errors.New("daemon lock already held by another process")
|
|||||||
func flockExclusive(f *os.File) error {
|
func flockExclusive(f *os.File) error {
|
||||||
// WASM doesn't support file locking
|
// WASM doesn't support file locking
|
||||||
// In a WASM environment, we're typically single-process anyway
|
// In a WASM environment, we're typically single-process anyway
|
||||||
return fmt.Errorf("file locking not supported in WASM")
|
return nil // No-op in WASM
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlockExclusiveBlocking acquires an exclusive blocking lock on the file.
|
||||||
|
// In WASM, this is a no-op since we're single-process.
|
||||||
|
func FlockExclusiveBlocking(f *os.File) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlockUnlock releases a lock on the file.
|
||||||
|
// In WASM, this is a no-op.
|
||||||
|
func FlockUnlock(f *os.File) error {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,3 +36,34 @@ func flockExclusive(f *os.File) error {
|
|||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FlockExclusiveBlocking acquires an exclusive blocking lock on the file.
|
||||||
|
// This will wait until the lock is available.
|
||||||
|
func FlockExclusiveBlocking(f *os.File) error {
|
||||||
|
// LOCKFILE_EXCLUSIVE_LOCK only (no FAIL_IMMEDIATELY = blocking)
|
||||||
|
const flags = windows.LOCKFILE_EXCLUSIVE_LOCK
|
||||||
|
|
||||||
|
ol := &windows.Overlapped{}
|
||||||
|
|
||||||
|
return windows.LockFileEx(
|
||||||
|
windows.Handle(f.Fd()),
|
||||||
|
flags,
|
||||||
|
0,
|
||||||
|
0xFFFFFFFF,
|
||||||
|
0xFFFFFFFF,
|
||||||
|
ol,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FlockUnlock releases a lock on the file.
|
||||||
|
func FlockUnlock(f *os.File) error {
|
||||||
|
ol := &windows.Overlapped{}
|
||||||
|
|
||||||
|
return windows.UnlockFileEx(
|
||||||
|
windows.Handle(f.Fd()),
|
||||||
|
0,
|
||||||
|
0xFFFFFFFF,
|
||||||
|
0xFFFFFFFF,
|
||||||
|
ol,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user