feat(daemons): implement discovery and list command (bd-146, bd-147)
- Add daemon discovery mechanism with socket scanning - Implement depth-limited filesystem walk to avoid hangs - Add DaemonInfo struct with metadata collection - Create 'bd daemons list' command with table and JSON output - Add FindDaemonByWorkspace and CleanupStaleSockets utilities - Fix workspace path to be parent of .beads directory - Add comprehensive tests for discovery functionality Closes bd-146 Closes bd-147
This commit is contained in:
203
internal/daemon/discovery.go
Normal file
203
internal/daemon/discovery.go
Normal file
@@ -0,0 +1,203 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/rpc"
|
||||
)
|
||||
|
||||
// walkWithDepth walks a directory tree with depth limiting
|
||||
func walkWithDepth(root string, currentDepth, maxDepth int, fn func(path string, info os.FileInfo) error) error {
|
||||
if currentDepth > maxDepth {
|
||||
return nil
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(root)
|
||||
if err != nil {
|
||||
// Skip directories we can't read
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
path := filepath.Join(root, entry.Name())
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip common directories that won't have beads databases
|
||||
if info.IsDir() {
|
||||
name := entry.Name()
|
||||
if strings.HasPrefix(name, ".") && name != ".beads" {
|
||||
continue // Skip hidden dirs except .beads
|
||||
}
|
||||
if name == "node_modules" || name == "vendor" || name == ".git" {
|
||||
continue
|
||||
}
|
||||
// Recurse into subdirectory
|
||||
if err := walkWithDepth(path, currentDepth+1, maxDepth, fn); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Process file
|
||||
if err := fn(path, info); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DaemonInfo represents metadata about a discovered daemon
|
||||
type DaemonInfo struct {
|
||||
WorkspacePath string
|
||||
DatabasePath string
|
||||
SocketPath string
|
||||
PID int
|
||||
Version string
|
||||
UptimeSeconds float64
|
||||
LastActivityTime string
|
||||
ExclusiveLockActive bool
|
||||
ExclusiveLockHolder string
|
||||
Alive bool
|
||||
Error string
|
||||
}
|
||||
|
||||
// DiscoverDaemons scans the filesystem for running bd daemons
|
||||
// It searches common locations and uses the Status RPC endpoint to gather metadata
|
||||
func DiscoverDaemons(searchRoots []string) ([]DaemonInfo, error) {
|
||||
var daemons []DaemonInfo
|
||||
seen := make(map[string]bool)
|
||||
|
||||
// If no search roots provided, use common locations
|
||||
if len(searchRoots) == 0 {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get home directory: %w", err)
|
||||
}
|
||||
searchRoots = []string{
|
||||
home,
|
||||
"/tmp",
|
||||
}
|
||||
// Also add current directory if in a git repo
|
||||
if cwd, err := os.Getwd(); err == nil {
|
||||
searchRoots = append(searchRoots, cwd)
|
||||
}
|
||||
}
|
||||
|
||||
// Search for .beads/bd.sock files (limit depth to avoid traversing entire filesystem)
|
||||
for _, root := range searchRoots {
|
||||
maxDepth := 10 // Limit recursion depth
|
||||
if err := walkWithDepth(root, 0, maxDepth, func(path string, info os.FileInfo) error {
|
||||
// Skip if not a socket file
|
||||
if info.Name() != "bd.sock" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip if already seen this socket
|
||||
if seen[path] {
|
||||
return nil
|
||||
}
|
||||
seen[path] = true
|
||||
|
||||
// Try to connect and get status
|
||||
daemon := discoverDaemon(path)
|
||||
daemons = append(daemons, daemon)
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
// Continue searching other roots even if one fails
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return daemons, nil
|
||||
}
|
||||
|
||||
// discoverDaemon attempts to connect to a daemon socket and retrieve its status
|
||||
func discoverDaemon(socketPath string) DaemonInfo {
|
||||
daemon := DaemonInfo{
|
||||
SocketPath: socketPath,
|
||||
Alive: false,
|
||||
}
|
||||
|
||||
// Try to connect with short timeout
|
||||
client, err := rpc.TryConnectWithTimeout(socketPath, 500*time.Millisecond)
|
||||
if err != nil {
|
||||
daemon.Error = fmt.Sprintf("failed to connect: %v", err)
|
||||
return daemon
|
||||
}
|
||||
if client == nil {
|
||||
daemon.Error = "daemon not responding or unhealthy"
|
||||
return daemon
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Get status
|
||||
status, err := client.Status()
|
||||
if err != nil {
|
||||
daemon.Error = fmt.Sprintf("failed to get status: %v", err)
|
||||
return daemon
|
||||
}
|
||||
|
||||
// Populate daemon info from status
|
||||
daemon.Alive = true
|
||||
daemon.WorkspacePath = status.WorkspacePath
|
||||
daemon.DatabasePath = status.DatabasePath
|
||||
daemon.PID = status.PID
|
||||
daemon.Version = status.Version
|
||||
daemon.UptimeSeconds = status.UptimeSeconds
|
||||
daemon.LastActivityTime = status.LastActivityTime
|
||||
daemon.ExclusiveLockActive = status.ExclusiveLockActive
|
||||
daemon.ExclusiveLockHolder = status.ExclusiveLockHolder
|
||||
|
||||
return daemon
|
||||
}
|
||||
|
||||
// FindDaemonByWorkspace finds a daemon serving a specific workspace
|
||||
func FindDaemonByWorkspace(workspacePath string) (*DaemonInfo, error) {
|
||||
// First try the socket in the workspace itself
|
||||
socketPath := filepath.Join(workspacePath, ".beads", "bd.sock")
|
||||
if _, err := os.Stat(socketPath); err == nil {
|
||||
daemon := discoverDaemon(socketPath)
|
||||
if daemon.Alive {
|
||||
return &daemon, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to discovering all daemons
|
||||
daemons, err := DiscoverDaemons([]string{workspacePath})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, daemon := range daemons {
|
||||
if daemon.WorkspacePath == workspacePath && daemon.Alive {
|
||||
return &daemon, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no daemon found for workspace: %s", workspacePath)
|
||||
}
|
||||
|
||||
// CleanupStaleSockets removes socket files for dead daemons
|
||||
func CleanupStaleSockets(daemons []DaemonInfo) (int, error) {
|
||||
cleaned := 0
|
||||
for _, daemon := range daemons {
|
||||
if !daemon.Alive && daemon.SocketPath != "" {
|
||||
if err := os.Remove(daemon.SocketPath); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return cleaned, fmt.Errorf("failed to remove stale socket %s: %w", daemon.SocketPath, err)
|
||||
}
|
||||
} else {
|
||||
cleaned++
|
||||
}
|
||||
}
|
||||
}
|
||||
return cleaned, nil
|
||||
}
|
||||
117
internal/daemon/discovery_test.go
Normal file
117
internal/daemon/discovery_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/steveyegge/beads/internal/rpc"
|
||||
"github.com/steveyegge/beads/internal/storage/sqlite"
|
||||
)
|
||||
|
||||
func TestDiscoverDaemon(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
workspace := filepath.Join(tmpDir, ".beads")
|
||||
os.MkdirAll(workspace, 0755)
|
||||
|
||||
// Start daemon
|
||||
dbPath := filepath.Join(workspace, "test.db")
|
||||
socketPath := filepath.Join(workspace, "bd.sock")
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create storage: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
server := rpc.NewServer(socketPath, store, tmpDir, dbPath)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go server.Start(ctx)
|
||||
<-server.WaitReady()
|
||||
defer server.Stop()
|
||||
|
||||
// Test discoverDaemon directly
|
||||
daemon := discoverDaemon(socketPath)
|
||||
if !daemon.Alive {
|
||||
t.Errorf("daemon not alive: %s", daemon.Error)
|
||||
}
|
||||
if daemon.PID != os.Getpid() {
|
||||
t.Errorf("wrong PID: expected %d, got %d", os.Getpid(), daemon.PID)
|
||||
}
|
||||
if daemon.UptimeSeconds <= 0 {
|
||||
t.Errorf("invalid uptime: %f", daemon.UptimeSeconds)
|
||||
}
|
||||
if daemon.WorkspacePath != tmpDir {
|
||||
t.Errorf("wrong workspace: expected %s, got %s", tmpDir, daemon.WorkspacePath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindDaemonByWorkspace(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
workspace := filepath.Join(tmpDir, ".beads")
|
||||
os.MkdirAll(workspace, 0755)
|
||||
|
||||
// Start daemon
|
||||
dbPath := filepath.Join(workspace, "test.db")
|
||||
socketPath := filepath.Join(workspace, "bd.sock")
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create storage: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
server := rpc.NewServer(socketPath, store, tmpDir, dbPath)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go server.Start(ctx)
|
||||
<-server.WaitReady()
|
||||
defer server.Stop()
|
||||
|
||||
// Find daemon by workspace
|
||||
daemon, err := FindDaemonByWorkspace(tmpDir)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to find daemon: %v", err)
|
||||
}
|
||||
if daemon == nil {
|
||||
t.Fatal("daemon not found")
|
||||
}
|
||||
if !daemon.Alive {
|
||||
t.Errorf("daemon not alive: %s", daemon.Error)
|
||||
}
|
||||
if daemon.WorkspacePath != tmpDir {
|
||||
t.Errorf("wrong workspace: expected %s, got %s", tmpDir, daemon.WorkspacePath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupStaleSockets(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create stale socket file
|
||||
stalePath := filepath.Join(tmpDir, "stale.sock")
|
||||
if err := os.WriteFile(stalePath, []byte{}, 0644); err != nil {
|
||||
t.Fatalf("failed to create stale socket: %v", err)
|
||||
}
|
||||
|
||||
daemons := []DaemonInfo{
|
||||
{
|
||||
SocketPath: stalePath,
|
||||
Alive: false,
|
||||
},
|
||||
}
|
||||
|
||||
cleaned, err := CleanupStaleSockets(daemons)
|
||||
if err != nil {
|
||||
t.Fatalf("cleanup failed: %v", err)
|
||||
}
|
||||
if cleaned != 1 {
|
||||
t.Errorf("expected 1 cleaned, got %d", cleaned)
|
||||
}
|
||||
|
||||
// Verify socket was removed
|
||||
if _, err := os.Stat(stalePath); !os.IsNotExist(err) {
|
||||
t.Error("stale socket still exists")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user