From 75c959e69cfced1b575ec23662bdbeb20241048c Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Sun, 26 Oct 2025 17:55:39 -0700 Subject: [PATCH] feat(daemon): add GET /status endpoint (bd-148) - Add OpStatus operation and StatusResponse type to RPC protocol - Add workspacePath and dbPath fields to Server struct - Implement handleStatus() handler with daemon metadata - Track last activity time with atomic.Value - Add client.Status() method - Check for exclusive locks via ShouldSkipDatabase() - Update all test files to use new NewServer signature - Add comprehensive status endpoint test Closes bd-148 --- cmd/bd/daemon.go | 11 ++-- internal/rpc/bench_test.go | 2 +- internal/rpc/client.go | 15 +++++ internal/rpc/limits_test.go | 8 +-- internal/rpc/protocol.go | 14 +++++ internal/rpc/rpc_test.go | 8 +-- internal/rpc/server.go | 73 +++++++++++++++++++----- internal/rpc/server_eviction_test.go | 20 +++---- internal/rpc/status_test.go | 85 ++++++++++++++++++++++++++++ internal/rpc/version_test.go | 10 ++-- 10 files changed, 203 insertions(+), 43 deletions(-) create mode 100644 internal/rpc/status_test.go diff --git a/cmd/bd/daemon.go b/cmd/bd/daemon.go index 6907a51b..f435b4ee 100644 --- a/cmd/bd/daemon.go +++ b/cmd/bd/daemon.go @@ -857,11 +857,11 @@ func setupDaemonLock(pidFile string, global bool, log daemonLogger) (io.Closer, return lock, nil } -func startRPCServer(ctx context.Context, socketPath string, store storage.Storage, log daemonLogger) (*rpc.Server, chan error, error) { +func startRPCServer(ctx context.Context, socketPath string, store storage.Storage, workspacePath string, dbPath string, log daemonLogger) (*rpc.Server, chan error, error) { // Sync daemon version with CLI version rpc.ServerVersion = Version - server := rpc.NewServer(socketPath, store) + server := rpc.NewServer(socketPath, store, workspacePath, dbPath) serverErrChan := make(chan error, 1) go func() { @@ -896,7 +896,7 @@ func runGlobalDaemon(log daemonLogger) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - server, _, err := startRPCServer(ctx, socketPath, nil, log) + server, _, err := startRPCServer(ctx, socketPath, nil, globalDir, "", log) if err != nil { return } @@ -1079,11 +1079,12 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p defer func() { _ = store.Close() }() log.log("Database opened: %s", daemonDBPath) - socketPath := filepath.Join(filepath.Dir(daemonDBPath), "bd.sock") + workspacePath := filepath.Dir(daemonDBPath) + socketPath := filepath.Join(workspacePath, "bd.sock") ctx, cancel := context.WithCancel(context.Background()) defer cancel() - server, serverErrChan, err := startRPCServer(ctx, socketPath, store, log) + server, serverErrChan, err := startRPCServer(ctx, socketPath, store, workspacePath, daemonDBPath, log) if err != nil { return } diff --git a/internal/rpc/bench_test.go b/internal/rpc/bench_test.go index 97c195b2..f89e419a 100644 --- a/internal/rpc/bench_test.go +++ b/internal/rpc/bench_test.go @@ -285,7 +285,7 @@ func setupBenchServer(b *testing.B) (*Server, *Client, func(), string) { b.Fatalf("Failed to create store: %v", err) } - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/internal/rpc/client.go b/internal/rpc/client.go index 06781a78..3bd98e17 100644 --- a/internal/rpc/client.go +++ b/internal/rpc/client.go @@ -179,6 +179,21 @@ func (c *Client) Ping() error { return nil } +// Status retrieves daemon status metadata +func (c *Client) Status() (*StatusResponse, error) { + resp, err := c.Execute(OpStatus, nil) + if err != nil { + return nil, err + } + + var status StatusResponse + if err := json.Unmarshal(resp.Data, &status); err != nil { + return nil, fmt.Errorf("failed to unmarshal status response: %w", err) + } + + return &status, nil +} + // Health sends a health check request to verify the daemon is healthy func (c *Client) Health() (*HealthResponse, error) { resp, err := c.Execute(OpHealth, nil) diff --git a/internal/rpc/limits_test.go b/internal/rpc/limits_test.go index f47251ee..dfa1fef4 100644 --- a/internal/rpc/limits_test.go +++ b/internal/rpc/limits_test.go @@ -44,7 +44,7 @@ func TestConnectionLimits(t *testing.T) { os.Setenv("BEADS_DAEMON_MAX_CONNS", "5") defer os.Unsetenv("BEADS_DAEMON_MAX_CONNS") - srv := NewServer(socketPath, store) + srv := NewServer(socketPath, store, tmpDir, dbPath) if srv.maxConns != 5 { t.Fatalf("expected maxConns=5, got %d", srv.maxConns) } @@ -166,7 +166,7 @@ func TestRequestTimeout(t *testing.T) { os.Setenv("BEADS_DAEMON_REQUEST_TIMEOUT", "100ms") defer os.Unsetenv("BEADS_DAEMON_REQUEST_TIMEOUT") - srv := NewServer(socketPath, store) + srv := NewServer(socketPath, store, tmpDir, dbPath) if srv.requestTimeout != 100*time.Millisecond { t.Fatalf("expected timeout=100ms, got %v", srv.requestTimeout) } @@ -219,7 +219,7 @@ func TestMemoryPressureDetection(t *testing.T) { socketPath := filepath.Join(tmpDir, "test.sock") - srv := NewServer(socketPath, store) + srv := NewServer(socketPath, store, tmpDir, dbPath) // Add some entries to cache srv.cacheMu.Lock() @@ -272,7 +272,7 @@ func TestHealthResponseIncludesLimits(t *testing.T) { os.Setenv("BEADS_DAEMON_MAX_CONNS", "50") defer os.Unsetenv("BEADS_DAEMON_MAX_CONNS") - srv := NewServer(socketPath, store) + srv := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/rpc/protocol.go b/internal/rpc/protocol.go index 02ab8c46..5f7f99ba 100644 --- a/internal/rpc/protocol.go +++ b/internal/rpc/protocol.go @@ -7,6 +7,7 @@ import ( // Operation constants for all bd commands const ( OpPing = "ping" + OpStatus = "status" OpHealth = "health" OpMetrics = "metrics" OpCreate = "create" @@ -165,6 +166,19 @@ type PingResponse struct { Version string `json:"version"` } +// StatusResponse represents the daemon status metadata +type StatusResponse struct { + Version string `json:"version"` // Server/daemon version + WorkspacePath string `json:"workspace_path"` // Absolute path to workspace root + DatabasePath string `json:"database_path"` // Absolute path to database file + SocketPath string `json:"socket_path"` // Path to Unix socket + PID int `json:"pid"` // Process ID + UptimeSeconds float64 `json:"uptime_seconds"` // Time since daemon started + LastActivityTime string `json:"last_activity_time"` // ISO 8601 timestamp of last request + ExclusiveLockActive bool `json:"exclusive_lock_active"` // Whether an exclusive lock is held + ExclusiveLockHolder string `json:"exclusive_lock_holder,omitempty"` // Lock holder name if active +} + // HealthResponse is the response for a health check operation type HealthResponse struct { Status string `json:"status"` // "healthy", "degraded", "unhealthy" diff --git a/internal/rpc/rpc_test.go b/internal/rpc/rpc_test.go index f5d13f37..d4865196 100644 --- a/internal/rpc/rpc_test.go +++ b/internal/rpc/rpc_test.go @@ -38,7 +38,7 @@ func setupTestServer(t *testing.T) (*Server, *Client, func()) { t.Fatalf("Failed to create store: %v", err) } - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) go func() { @@ -321,7 +321,7 @@ func TestSocketCleanup(t *testing.T) { } defer store.Close() - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -433,7 +433,7 @@ func TestDatabaseHandshake(t *testing.T) { } defer store1.Close() - server1 := NewServer(socketPath1, store1) + server1 := NewServer(socketPath1, store1, tmpDir1, dbPath1) ctx1, cancel1 := context.WithCancel(context.Background()) defer cancel1() go server1.Start(ctx1) @@ -451,7 +451,7 @@ func TestDatabaseHandshake(t *testing.T) { } defer store2.Close() - server2 := NewServer(socketPath2, store2) + server2 := NewServer(socketPath2, store2, tmpDir2, dbPath2) ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel2() go server2.Start(ctx2) diff --git a/internal/rpc/server.go b/internal/rpc/server.go index 3ed4c69e..fb43d95b 100644 --- a/internal/rpc/server.go +++ b/internal/rpc/server.go @@ -60,14 +60,16 @@ type StorageCacheEntry struct { // Server represents the RPC server that runs in the daemon type Server struct { - socketPath string - storage storage.Storage // Default storage (for backward compat) - listener net.Listener - mu sync.RWMutex - shutdown bool - shutdownChan chan struct{} - stopOnce sync.Once - doneChan chan struct{} // closed when Start() cleanup is complete + socketPath string + workspacePath string // Absolute path to workspace root + dbPath string // Absolute path to database file + storage storage.Storage // Default storage (for backward compat) + listener net.Listener + mu sync.RWMutex + shutdown bool + shutdownChan chan struct{} + stopOnce sync.Once + doneChan chan struct{} // closed when Start() cleanup is complete // Per-request storage routing with eviction support storageCache map[string]*StorageCacheEntry // repoRoot -> entry cacheMu sync.RWMutex @@ -75,10 +77,11 @@ type Server struct { cacheTTL time.Duration cleanupTicker *time.Ticker // Health and metrics - startTime time.Time - cacheHits int64 - cacheMisses int64 - metrics *Metrics + startTime time.Time + lastActivityTime atomic.Value // time.Time - last request timestamp + cacheHits int64 + cacheMisses int64 + metrics *Metrics // Connection limiting maxConns int activeConns int32 // atomic counter @@ -93,7 +96,7 @@ type Server struct { } // NewServer creates a new RPC server -func NewServer(socketPath string, store storage.Storage) *Server { +func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server { // Parse config from env vars maxCacheSize := 50 // default if env := os.Getenv("BEADS_DAEMON_MAX_CACHE_SIZE"); env != "" { @@ -126,8 +129,10 @@ func NewServer(socketPath string, store storage.Storage) *Server { } } - return &Server{ + s := &Server{ socketPath: socketPath, + workspacePath: workspacePath, + dbPath: dbPath, storage: store, storageCache: make(map[string]*StorageCacheEntry), maxCacheSize: maxCacheSize, @@ -141,6 +146,8 @@ func NewServer(socketPath string, store storage.Storage) *Server { requestTimeout: requestTimeout, readyChan: make(chan struct{}), } + s.lastActivityTime.Store(time.Now()) + return s } // Start starts the RPC server and listens for connections @@ -629,10 +636,15 @@ func (s *Server) handleRequest(req *Request) Response { } } + // Update last activity timestamp + s.lastActivityTime.Store(time.Now()) + var resp Response switch req.Operation { case OpPing: resp = s.handlePing(req) + case OpStatus: + resp = s.handleStatus(req) case OpHealth: resp = s.handleHealth(req) case OpMetrics: @@ -753,6 +765,39 @@ func (s *Server) handlePing(_ *Request) Response { } } +func (s *Server) handleStatus(_ *Request) Response { + // Get last activity timestamp + lastActivity := s.lastActivityTime.Load().(time.Time) + + // Check for exclusive lock + lockActive := false + lockHolder := "" + if s.workspacePath != "" { + if skip, holder, _ := types.ShouldSkipDatabase(s.workspacePath); skip { + lockActive = true + lockHolder = holder + } + } + + statusResp := StatusResponse{ + Version: ServerVersion, + WorkspacePath: s.workspacePath, + DatabasePath: s.dbPath, + SocketPath: s.socketPath, + PID: os.Getpid(), + UptimeSeconds: time.Since(s.startTime).Seconds(), + LastActivityTime: lastActivity.Format(time.RFC3339), + ExclusiveLockActive: lockActive, + ExclusiveLockHolder: lockHolder, + } + + data, _ := json.Marshal(statusResp) + return Response{ + Success: true, + Data: data, + } +} + func (s *Server) handleHealth(req *Request) Response { start := time.Now() diff --git a/internal/rpc/server_eviction_test.go b/internal/rpc/server_eviction_test.go index 95e942e8..729ed1dd 100644 --- a/internal/rpc/server_eviction_test.go +++ b/internal/rpc/server_eviction_test.go @@ -23,7 +23,7 @@ func TestStorageCacheEviction_TTL(t *testing.T) { // Create server with short TTL for testing socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) server.cacheTTL = 100 * time.Millisecond // Short TTL for testing defer server.Stop() @@ -93,7 +93,7 @@ func TestStorageCacheEviction_LRU(t *testing.T) { // Create server with small cache size socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) server.maxCacheSize = 2 // Only keep 2 entries server.cacheTTL = 1 * time.Hour // Long TTL so we test LRU defer server.Stop() @@ -178,7 +178,7 @@ func TestStorageCacheEviction_LastAccessUpdate(t *testing.T) { // Create server socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) defer server.Stop() // Create test database @@ -242,7 +242,7 @@ func TestStorageCacheEviction_EnvVars(t *testing.T) { // Create server socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) defer server.Stop() // Verify config was parsed @@ -268,7 +268,7 @@ func TestStorageCacheEviction_CleanupOnStop(t *testing.T) { // Create server socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) // Create test database and populate cache dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") @@ -320,7 +320,7 @@ func TestStorageCacheEviction_CanonicalKey(t *testing.T) { // Create server socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) defer server.Stop() // Create test database @@ -373,7 +373,7 @@ func TestStorageCacheEviction_ImmediateLRU(t *testing.T) { // Create server with max cache size of 2 socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) server.maxCacheSize = 2 server.cacheTTL = 1 * time.Hour // Long TTL defer server.Stop() @@ -425,7 +425,7 @@ func TestStorageCacheEviction_InvalidTTL(t *testing.T) { // Create server socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) defer server.Stop() // Should fall back to default (30 minutes) @@ -448,7 +448,7 @@ func TestStorageCacheEviction_ReopenAfterEviction(t *testing.T) { // Create server with short TTL socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) server.cacheTTL = 50 * time.Millisecond defer server.Stop() @@ -510,7 +510,7 @@ func TestStorageCacheEviction_StopIdempotent(t *testing.T) { // Create server socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore) + server := NewServer(socketPath, mainStore, tmpDir, mainDB) // Stop multiple times - should not panic if err := server.Stop(); err != nil { diff --git a/internal/rpc/status_test.go b/internal/rpc/status_test.go new file mode 100644 index 00000000..2fd89d70 --- /dev/null +++ b/internal/rpc/status_test.go @@ -0,0 +1,85 @@ +package rpc + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/steveyegge/beads/internal/storage/sqlite" +) + +func TestStatusEndpoint(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "test.db") + socketPath := filepath.Join(tmpDir, "test.sock") + + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatalf("failed to create storage: %v", err) + } + defer store.Close() + + server := NewServer(socketPath, store, tmpDir, dbPath) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + _ = server.Start(ctx) + }() + + <-server.WaitReady() + defer server.Stop() + + client, err := TryConnect(socketPath) + if err != nil { + t.Fatalf("failed to connect: %v", err) + } + if client == nil { + t.Fatal("client is nil") + } + defer client.Close() + + // Test status endpoint + status, err := client.Status() + if err != nil { + t.Fatalf("status call failed: %v", err) + } + + // Verify response fields + if status.Version == "" { + t.Error("expected version to be set") + } + if status.WorkspacePath != tmpDir { + t.Errorf("expected workspace path %s, got %s", tmpDir, status.WorkspacePath) + } + if status.DatabasePath != dbPath { + t.Errorf("expected database path %s, got %s", dbPath, status.DatabasePath) + } + if status.SocketPath != socketPath { + t.Errorf("expected socket path %s, got %s", socketPath, status.SocketPath) + } + if status.PID != os.Getpid() { + t.Errorf("expected PID %d, got %d", os.Getpid(), status.PID) + } + if status.UptimeSeconds <= 0 { + t.Error("expected positive uptime") + } + if status.LastActivityTime == "" { + t.Error("expected last activity time to be set") + } + if status.ExclusiveLockActive { + t.Error("expected no exclusive lock in test") + } + + // Verify last activity time is recent + lastActivity, err := time.Parse(time.RFC3339, status.LastActivityTime) + if err != nil { + t.Errorf("failed to parse last activity time: %v", err) + } + if time.Since(lastActivity) > 5*time.Second { + t.Errorf("last activity time too old: %v", lastActivity) + } +} diff --git a/internal/rpc/version_test.go b/internal/rpc/version_test.go index 7b437820..7ff490b2 100644 --- a/internal/rpc/version_test.go +++ b/internal/rpc/version_test.go @@ -96,7 +96,7 @@ func TestVersionCompatibility(t *testing.T) { ServerVersion = tt.serverVersion defer func() { ServerVersion = originalServerVersion }() - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -187,7 +187,7 @@ func TestHealthCheckIncludesVersionInfo(t *testing.T) { ServerVersion = testVersion100 ClientVersion = testVersion100 - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -251,7 +251,7 @@ func TestIncompatibleVersionInHealth(t *testing.T) { ServerVersion = testVersion100 ClientVersion = "2.0.0" - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -371,7 +371,7 @@ func TestPingAndHealthBypassVersionCheck(t *testing.T) { ServerVersion = testVersion100 ClientVersion = "2.0.0" - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -445,7 +445,7 @@ func TestMetricsOperation(t *testing.T) { ServerVersion = testVersion100 ClientVersion = testVersion100 - server := NewServer(socketPath, store) + server := NewServer(socketPath, store, tmpDir, dbPath) ctx, cancel := context.WithCancel(context.Background()) defer cancel()