Add daemon health check endpoint (bd-146)
- Add OpHealth RPC operation to protocol - Implement handleHealth() with DB ping and 1s timeout - Returns status (healthy/degraded/unhealthy), uptime, cache metrics - Update TryConnect() to use health check instead of ping - Add 'bd daemon --health' CLI command with JSON output - Track cache hits/misses for metrics - Unhealthy daemon triggers automatic fallback to direct mode - Health check completes in <2 seconds Amp-Thread-ID: https://ampcode.com/threads/T-1a4889f3-77cf-433a-a704-e1c383929f48 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -40,6 +41,10 @@ type Server struct {
|
||||
maxCacheSize int
|
||||
cacheTTL time.Duration
|
||||
cleanupTicker *time.Ticker
|
||||
// Health and metrics
|
||||
startTime time.Time
|
||||
cacheHits int64
|
||||
cacheMisses int64
|
||||
}
|
||||
|
||||
// NewServer creates a new RPC server
|
||||
@@ -68,6 +73,7 @@ func NewServer(socketPath string, store storage.Storage) *Server {
|
||||
maxCacheSize: maxCacheSize,
|
||||
cacheTTL: cacheTTL,
|
||||
shutdownChan: make(chan struct{}),
|
||||
startTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -274,6 +280,8 @@ func (s *Server) handleRequest(req *Request) Response {
|
||||
switch req.Operation {
|
||||
case OpPing:
|
||||
return s.handlePing(req)
|
||||
case OpHealth:
|
||||
return s.handleHealth(req)
|
||||
case OpCreate:
|
||||
return s.handleCreate(req)
|
||||
case OpUpdate:
|
||||
@@ -379,6 +387,66 @@ func (s *Server) handlePing(_ *Request) Response {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleHealth(req *Request) Response {
|
||||
start := time.Now()
|
||||
|
||||
store, err := s.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
data, _ := json.Marshal(HealthResponse{
|
||||
Status: "unhealthy",
|
||||
Version: "0.9.8",
|
||||
Uptime: time.Since(s.startTime).Seconds(),
|
||||
Error: fmt.Sprintf("storage error: %v", err),
|
||||
})
|
||||
return Response{
|
||||
Success: false,
|
||||
Data: data,
|
||||
Error: fmt.Sprintf("storage error: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
healthCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
|
||||
status := "healthy"
|
||||
dbError := ""
|
||||
|
||||
_, pingErr := store.GetStatistics(healthCtx)
|
||||
dbResponseMs := time.Since(start).Seconds() * 1000
|
||||
|
||||
if pingErr != nil {
|
||||
status = "unhealthy"
|
||||
dbError = pingErr.Error()
|
||||
} else if dbResponseMs > 500 {
|
||||
status = "degraded"
|
||||
}
|
||||
|
||||
s.cacheMu.RLock()
|
||||
cacheSize := len(s.storageCache)
|
||||
s.cacheMu.RUnlock()
|
||||
|
||||
health := HealthResponse{
|
||||
Status: status,
|
||||
Version: "0.9.8",
|
||||
Uptime: time.Since(s.startTime).Seconds(),
|
||||
CacheSize: cacheSize,
|
||||
CacheHits: atomic.LoadInt64(&s.cacheHits),
|
||||
CacheMisses: atomic.LoadInt64(&s.cacheMisses),
|
||||
DBResponseTime: dbResponseMs,
|
||||
}
|
||||
|
||||
if dbError != "" {
|
||||
health.Error = dbError
|
||||
}
|
||||
|
||||
data, _ := json.Marshal(health)
|
||||
return Response{
|
||||
Success: status != "unhealthy",
|
||||
Data: data,
|
||||
Error: dbError,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleCreate(req *Request) Response {
|
||||
var createArgs CreateArgs
|
||||
if err := json.Unmarshal(req.Args, &createArgs); err != nil {
|
||||
@@ -849,8 +917,11 @@ func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) {
|
||||
if entry, ok := s.storageCache[repoRoot]; ok {
|
||||
// Update last access time (safe under Lock)
|
||||
entry.lastAccess = time.Now()
|
||||
atomic.AddInt64(&s.cacheHits, 1)
|
||||
return entry.store, nil
|
||||
}
|
||||
|
||||
atomic.AddInt64(&s.cacheMisses, 1)
|
||||
|
||||
// Open storage
|
||||
store, err := sqlite.New(dbPath)
|
||||
|
||||
Reference in New Issue
Block a user