feat(daemon): add GET /status endpoint (bd-148)

- Add OpStatus operation and StatusResponse type to RPC protocol
- Add workspacePath and dbPath fields to Server struct
- Implement handleStatus() handler with daemon metadata
- Track last activity time with atomic.Value
- Add client.Status() method
- Check for exclusive locks via ShouldSkipDatabase()
- Update all test files to use new NewServer signature
- Add comprehensive status endpoint test

Closes bd-148
This commit is contained in:
Steve Yegge
2025-10-26 17:55:39 -07:00
parent 6bf5c9d2b9
commit 75c959e69c
10 changed files with 203 additions and 43 deletions

View File

@@ -285,7 +285,7 @@ func setupBenchServer(b *testing.B) (*Server, *Client, func(), string) {
b.Fatalf("Failed to create store: %v", err)
}
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
go func() {

View File

@@ -179,6 +179,21 @@ func (c *Client) Ping() error {
return nil
}
// Status retrieves daemon status metadata
func (c *Client) Status() (*StatusResponse, error) {
resp, err := c.Execute(OpStatus, nil)
if err != nil {
return nil, err
}
var status StatusResponse
if err := json.Unmarshal(resp.Data, &status); err != nil {
return nil, fmt.Errorf("failed to unmarshal status response: %w", err)
}
return &status, nil
}
// Health sends a health check request to verify the daemon is healthy
func (c *Client) Health() (*HealthResponse, error) {
resp, err := c.Execute(OpHealth, nil)

View File

@@ -44,7 +44,7 @@ func TestConnectionLimits(t *testing.T) {
os.Setenv("BEADS_DAEMON_MAX_CONNS", "5")
defer os.Unsetenv("BEADS_DAEMON_MAX_CONNS")
srv := NewServer(socketPath, store)
srv := NewServer(socketPath, store, tmpDir, dbPath)
if srv.maxConns != 5 {
t.Fatalf("expected maxConns=5, got %d", srv.maxConns)
}
@@ -166,7 +166,7 @@ func TestRequestTimeout(t *testing.T) {
os.Setenv("BEADS_DAEMON_REQUEST_TIMEOUT", "100ms")
defer os.Unsetenv("BEADS_DAEMON_REQUEST_TIMEOUT")
srv := NewServer(socketPath, store)
srv := NewServer(socketPath, store, tmpDir, dbPath)
if srv.requestTimeout != 100*time.Millisecond {
t.Fatalf("expected timeout=100ms, got %v", srv.requestTimeout)
}
@@ -219,7 +219,7 @@ func TestMemoryPressureDetection(t *testing.T) {
socketPath := filepath.Join(tmpDir, "test.sock")
srv := NewServer(socketPath, store)
srv := NewServer(socketPath, store, tmpDir, dbPath)
// Add some entries to cache
srv.cacheMu.Lock()
@@ -272,7 +272,7 @@ func TestHealthResponseIncludesLimits(t *testing.T) {
os.Setenv("BEADS_DAEMON_MAX_CONNS", "50")
defer os.Unsetenv("BEADS_DAEMON_MAX_CONNS")
srv := NewServer(socketPath, store)
srv := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@@ -7,6 +7,7 @@ import (
// Operation constants for all bd commands
const (
OpPing = "ping"
OpStatus = "status"
OpHealth = "health"
OpMetrics = "metrics"
OpCreate = "create"
@@ -165,6 +166,19 @@ type PingResponse struct {
Version string `json:"version"`
}
// StatusResponse represents the daemon status metadata
type StatusResponse struct {
Version string `json:"version"` // Server/daemon version
WorkspacePath string `json:"workspace_path"` // Absolute path to workspace root
DatabasePath string `json:"database_path"` // Absolute path to database file
SocketPath string `json:"socket_path"` // Path to Unix socket
PID int `json:"pid"` // Process ID
UptimeSeconds float64 `json:"uptime_seconds"` // Time since daemon started
LastActivityTime string `json:"last_activity_time"` // ISO 8601 timestamp of last request
ExclusiveLockActive bool `json:"exclusive_lock_active"` // Whether an exclusive lock is held
ExclusiveLockHolder string `json:"exclusive_lock_holder,omitempty"` // Lock holder name if active
}
// HealthResponse is the response for a health check operation
type HealthResponse struct {
Status string `json:"status"` // "healthy", "degraded", "unhealthy"

View File

@@ -38,7 +38,7 @@ func setupTestServer(t *testing.T) (*Server, *Client, func()) {
t.Fatalf("Failed to create store: %v", err)
}
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
go func() {
@@ -321,7 +321,7 @@ func TestSocketCleanup(t *testing.T) {
}
defer store.Close()
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -433,7 +433,7 @@ func TestDatabaseHandshake(t *testing.T) {
}
defer store1.Close()
server1 := NewServer(socketPath1, store1)
server1 := NewServer(socketPath1, store1, tmpDir1, dbPath1)
ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
go server1.Start(ctx1)
@@ -451,7 +451,7 @@ func TestDatabaseHandshake(t *testing.T) {
}
defer store2.Close()
server2 := NewServer(socketPath2, store2)
server2 := NewServer(socketPath2, store2, tmpDir2, dbPath2)
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
go server2.Start(ctx2)

View File

@@ -60,14 +60,16 @@ type StorageCacheEntry struct {
// Server represents the RPC server that runs in the daemon
type Server struct {
socketPath string
storage storage.Storage // Default storage (for backward compat)
listener net.Listener
mu sync.RWMutex
shutdown bool
shutdownChan chan struct{}
stopOnce sync.Once
doneChan chan struct{} // closed when Start() cleanup is complete
socketPath string
workspacePath string // Absolute path to workspace root
dbPath string // Absolute path to database file
storage storage.Storage // Default storage (for backward compat)
listener net.Listener
mu sync.RWMutex
shutdown bool
shutdownChan chan struct{}
stopOnce sync.Once
doneChan chan struct{} // closed when Start() cleanup is complete
// Per-request storage routing with eviction support
storageCache map[string]*StorageCacheEntry // repoRoot -> entry
cacheMu sync.RWMutex
@@ -75,10 +77,11 @@ type Server struct {
cacheTTL time.Duration
cleanupTicker *time.Ticker
// Health and metrics
startTime time.Time
cacheHits int64
cacheMisses int64
metrics *Metrics
startTime time.Time
lastActivityTime atomic.Value // time.Time - last request timestamp
cacheHits int64
cacheMisses int64
metrics *Metrics
// Connection limiting
maxConns int
activeConns int32 // atomic counter
@@ -93,7 +96,7 @@ type Server struct {
}
// NewServer creates a new RPC server
func NewServer(socketPath string, store storage.Storage) *Server {
func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server {
// Parse config from env vars
maxCacheSize := 50 // default
if env := os.Getenv("BEADS_DAEMON_MAX_CACHE_SIZE"); env != "" {
@@ -126,8 +129,10 @@ func NewServer(socketPath string, store storage.Storage) *Server {
}
}
return &Server{
s := &Server{
socketPath: socketPath,
workspacePath: workspacePath,
dbPath: dbPath,
storage: store,
storageCache: make(map[string]*StorageCacheEntry),
maxCacheSize: maxCacheSize,
@@ -141,6 +146,8 @@ func NewServer(socketPath string, store storage.Storage) *Server {
requestTimeout: requestTimeout,
readyChan: make(chan struct{}),
}
s.lastActivityTime.Store(time.Now())
return s
}
// Start starts the RPC server and listens for connections
@@ -629,10 +636,15 @@ func (s *Server) handleRequest(req *Request) Response {
}
}
// Update last activity timestamp
s.lastActivityTime.Store(time.Now())
var resp Response
switch req.Operation {
case OpPing:
resp = s.handlePing(req)
case OpStatus:
resp = s.handleStatus(req)
case OpHealth:
resp = s.handleHealth(req)
case OpMetrics:
@@ -753,6 +765,39 @@ func (s *Server) handlePing(_ *Request) Response {
}
}
func (s *Server) handleStatus(_ *Request) Response {
// Get last activity timestamp
lastActivity := s.lastActivityTime.Load().(time.Time)
// Check for exclusive lock
lockActive := false
lockHolder := ""
if s.workspacePath != "" {
if skip, holder, _ := types.ShouldSkipDatabase(s.workspacePath); skip {
lockActive = true
lockHolder = holder
}
}
statusResp := StatusResponse{
Version: ServerVersion,
WorkspacePath: s.workspacePath,
DatabasePath: s.dbPath,
SocketPath: s.socketPath,
PID: os.Getpid(),
UptimeSeconds: time.Since(s.startTime).Seconds(),
LastActivityTime: lastActivity.Format(time.RFC3339),
ExclusiveLockActive: lockActive,
ExclusiveLockHolder: lockHolder,
}
data, _ := json.Marshal(statusResp)
return Response{
Success: true,
Data: data,
}
}
func (s *Server) handleHealth(req *Request) Response {
start := time.Now()

View File

@@ -23,7 +23,7 @@ func TestStorageCacheEviction_TTL(t *testing.T) {
// Create server with short TTL for testing
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
server.cacheTTL = 100 * time.Millisecond // Short TTL for testing
defer server.Stop()
@@ -93,7 +93,7 @@ func TestStorageCacheEviction_LRU(t *testing.T) {
// Create server with small cache size
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
server.maxCacheSize = 2 // Only keep 2 entries
server.cacheTTL = 1 * time.Hour // Long TTL so we test LRU
defer server.Stop()
@@ -178,7 +178,7 @@ func TestStorageCacheEviction_LastAccessUpdate(t *testing.T) {
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
defer server.Stop()
// Create test database
@@ -242,7 +242,7 @@ func TestStorageCacheEviction_EnvVars(t *testing.T) {
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
defer server.Stop()
// Verify config was parsed
@@ -268,7 +268,7 @@ func TestStorageCacheEviction_CleanupOnStop(t *testing.T) {
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
// Create test database and populate cache
dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
@@ -320,7 +320,7 @@ func TestStorageCacheEviction_CanonicalKey(t *testing.T) {
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
defer server.Stop()
// Create test database
@@ -373,7 +373,7 @@ func TestStorageCacheEviction_ImmediateLRU(t *testing.T) {
// Create server with max cache size of 2
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
server.maxCacheSize = 2
server.cacheTTL = 1 * time.Hour // Long TTL
defer server.Stop()
@@ -425,7 +425,7 @@ func TestStorageCacheEviction_InvalidTTL(t *testing.T) {
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
defer server.Stop()
// Should fall back to default (30 minutes)
@@ -448,7 +448,7 @@ func TestStorageCacheEviction_ReopenAfterEviction(t *testing.T) {
// Create server with short TTL
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
server.cacheTTL = 50 * time.Millisecond
defer server.Stop()
@@ -510,7 +510,7 @@ func TestStorageCacheEviction_StopIdempotent(t *testing.T) {
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
// Stop multiple times - should not panic
if err := server.Stop(); err != nil {

View File

@@ -0,0 +1,85 @@
package rpc
import (
"context"
"os"
"path/filepath"
"testing"
"time"
"github.com/steveyegge/beads/internal/storage/sqlite"
)
func TestStatusEndpoint(t *testing.T) {
tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
socketPath := filepath.Join(tmpDir, "test.sock")
store, err := sqlite.New(dbPath)
if err != nil {
t.Fatalf("failed to create storage: %v", err)
}
defer store.Close()
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
_ = server.Start(ctx)
}()
<-server.WaitReady()
defer server.Stop()
client, err := TryConnect(socketPath)
if err != nil {
t.Fatalf("failed to connect: %v", err)
}
if client == nil {
t.Fatal("client is nil")
}
defer client.Close()
// Test status endpoint
status, err := client.Status()
if err != nil {
t.Fatalf("status call failed: %v", err)
}
// Verify response fields
if status.Version == "" {
t.Error("expected version to be set")
}
if status.WorkspacePath != tmpDir {
t.Errorf("expected workspace path %s, got %s", tmpDir, status.WorkspacePath)
}
if status.DatabasePath != dbPath {
t.Errorf("expected database path %s, got %s", dbPath, status.DatabasePath)
}
if status.SocketPath != socketPath {
t.Errorf("expected socket path %s, got %s", socketPath, status.SocketPath)
}
if status.PID != os.Getpid() {
t.Errorf("expected PID %d, got %d", os.Getpid(), status.PID)
}
if status.UptimeSeconds <= 0 {
t.Error("expected positive uptime")
}
if status.LastActivityTime == "" {
t.Error("expected last activity time to be set")
}
if status.ExclusiveLockActive {
t.Error("expected no exclusive lock in test")
}
// Verify last activity time is recent
lastActivity, err := time.Parse(time.RFC3339, status.LastActivityTime)
if err != nil {
t.Errorf("failed to parse last activity time: %v", err)
}
if time.Since(lastActivity) > 5*time.Second {
t.Errorf("last activity time too old: %v", lastActivity)
}
}

View File

@@ -96,7 +96,7 @@ func TestVersionCompatibility(t *testing.T) {
ServerVersion = tt.serverVersion
defer func() { ServerVersion = originalServerVersion }()
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -187,7 +187,7 @@ func TestHealthCheckIncludesVersionInfo(t *testing.T) {
ServerVersion = testVersion100
ClientVersion = testVersion100
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -251,7 +251,7 @@ func TestIncompatibleVersionInHealth(t *testing.T) {
ServerVersion = testVersion100
ClientVersion = "2.0.0"
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -371,7 +371,7 @@ func TestPingAndHealthBypassVersionCheck(t *testing.T) {
ServerVersion = testVersion100
ClientVersion = "2.0.0"
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -445,7 +445,7 @@ func TestMetricsOperation(t *testing.T) {
ServerVersion = testVersion100
ClientVersion = testVersion100
server := NewServer(socketPath, store)
server := NewServer(socketPath, store, tmpDir, dbPath)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()