diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index e8d28fa7..08d7af36 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -57,7 +57,7 @@ {"id":"bd-15","title":"Add performance benchmarks document","description":"Document actual performance metrics with hyperfine tests","status":"closed","priority":3,"issue_type":"task","created_at":"2025-10-16T20:46:08.971822-07:00","updated_at":"2025-10-18T18:35:11.75028-07:00","closed_at":"2025-10-18T10:09:23.532938-07:00"} {"id":"bd-150","title":"Improve daemon fallback visibility and user feedback","description":"When daemon is unavailable, bd silently falls back to direct mode. Users don't know:\n- That daemon exists\n- Why auto-start failed\n- That they're in degraded mode\n- How to fix it\n\nThis creates confusion for multi-repo users who get slower performance without explanation.\n\nLocation: cmd/bd/main.go:98-130","design":"Add visibility at multiple levels:\n\n1. Debug logging (existing BD_DEBUG):\n - Already shows daemon connection attempts\n - Add auto-start success/failure\n\n2. Verbose mode (BD_VERBOSE):\n - Show warning when falling back\n - Suggest 'bd daemon --status' to check\n\n3. Status indicator:\n - Add daemon status to all commands when --json\n - Example: {\"daemon_status\": \"healthy\", \"daemon_type\": \"local\", ...}\n\n4. Explicit status command:\n - bd daemon --status shows detailed info\n - Shows whether daemon is running/healthy/unavailable\n\n5. Helpful error messages:\n - When auto-start fails repeatedly\n - When falling back after health check failure\n - With actionable next steps","acceptance_criteria":"- Users can see daemon status easily\n- Fallback warnings are helpful not noisy\n- JSON output includes daemon status\n- Error messages are actionable\n- Documentation explains status indicators\n- bd daemon --status command works","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-10-18T13:06:46.212558-07:00","updated_at":"2025-10-18T18:36:51.769633-07:00","closed_at":"2025-10-18T18:36:51.769633-07:00"} {"id":"bd-151","title":"Add version compatibility checks for daemon RPC protocol","description":"Client (bd CLI) and daemon may be different versions after upgrade. This causes:\n- Missing features (newer CLI, older daemon)\n- Protocol mismatches (older CLI, newer daemon)\n- Silent failures or confusing errors\n- No guidance to restart daemon\n\nLocation: internal/rpc/protocol.go, internal/rpc/client.go","design":"Add version field to RPC protocol:\n\n1. Add ClientVersion to Request struct\n2. Populate from Version constant in client\n3. Server checks compatibility in handleRequest()\n\nCompatibility rules:\n- Major version must match\n- Minor version backward compatible\n- Patch version always compatible\n\nOn mismatch:\n- Return clear error message\n- Suggest 'bd daemon --stop \u0026\u0026 bd daemon'\n- Log version info for debugging\n\nAdd to ping/health response:\n- Server version\n- Protocol version\n- Compatibility info\n\nAdd bd version --daemon command to check running daemon version.","acceptance_criteria":"- Version field in RPC protocol\n- Server validates client version\n- Clear error messages on mismatch\n- Health check returns version info\n- bd version --daemon command works\n- Documentation on version policy\n- Tests for version compatibility","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-10-18T13:06:57.417411-07:00","updated_at":"2025-10-18T18:46:03.047035-07:00","closed_at":"2025-10-18T18:46:03.047035-07:00"} -{"id":"bd-152","title":"Add resource limits to daemon (connections, cache, memory)","description":"Daemon has no resource limits. Under heavy load or attack, it could:\n- Accept unlimited connections\n- Cache unlimited databases\n- Use unbounded memory\n- Exhaust file descriptors\n\nNeed limits for:\n- Max concurrent RPC connections (default: 100)\n- Max storage cache size (default: 50)\n- Request timeout enforcement (default: 30s)\n- Memory pressure detection\n\nLocation: internal/rpc/server.go","design":"Add resource tracking to Server:\n\ntype Server struct {\n // ... existing\n maxConns int32\n activeConns int32 // atomic\n connSemaphore chan struct{}\n}\n\nUse semaphore pattern for connection limiting:\n- Acquire token before handling connection\n- Release on completion\n- Reject connections when full\n\nAdd configurable limits via env vars:\n- BEADS_DAEMON_MAX_CONNS (default: 100)\n- BEADS_DAEMON_MAX_CACHE_SIZE (default: 50)\n- BEADS_DAEMON_REQUEST_TIMEOUT (default: 30s)\n\nAdd memory pressure detection:\n- Monitor runtime.MemStats\n- Trigger cache eviction at threshold\n- Log warnings at high memory use","acceptance_criteria":"- Connection limit enforced\n- Excess connections rejected gracefully\n- Request timeouts work\n- Memory limits configurable\n- Metrics expose current usage\n- Tests for limit enforcement\n- Documentation on tuning limits","status":"open","priority":2,"issue_type":"feature","created_at":"2025-10-18T13:07:09.810963-07:00","updated_at":"2025-10-18T18:35:11.751371-07:00"} +{"id":"bd-152","title":"Add resource limits to daemon (connections, cache, memory)","description":"Daemon has no resource limits. Under heavy load or attack, it could:\n- Accept unlimited connections\n- Cache unlimited databases\n- Use unbounded memory\n- Exhaust file descriptors\n\nNeed limits for:\n- Max concurrent RPC connections (default: 100)\n- Max storage cache size (default: 50)\n- Request timeout enforcement (default: 30s)\n- Memory pressure detection\n\nLocation: internal/rpc/server.go","design":"Add resource tracking to Server:\n\ntype Server struct {\n // ... existing\n maxConns int32\n activeConns int32 // atomic\n connSemaphore chan struct{}\n}\n\nUse semaphore pattern for connection limiting:\n- Acquire token before handling connection\n- Release on completion\n- Reject connections when full\n\nAdd configurable limits via env vars:\n- BEADS_DAEMON_MAX_CONNS (default: 100)\n- BEADS_DAEMON_MAX_CACHE_SIZE (default: 50)\n- BEADS_DAEMON_REQUEST_TIMEOUT (default: 30s)\n\nAdd memory pressure detection:\n- Monitor runtime.MemStats\n- Trigger cache eviction at threshold\n- Log warnings at high memory use","acceptance_criteria":"- Connection limit enforced\n- Excess connections rejected gracefully\n- Request timeouts work\n- Memory limits configurable\n- Metrics expose current usage\n- Tests for limit enforcement\n- Documentation on tuning limits","status":"in_progress","priority":2,"issue_type":"feature","created_at":"2025-10-18T13:07:09.810963-07:00","updated_at":"2025-10-19T13:17:16.992675-07:00"} {"id":"bd-153","title":"Add telemetry and observability to daemon","description":"Daemon has no metrics or observability. Cannot monitor:\n- Request latency (p50, p95, p99)\n- Cache hit/miss rates\n- Active connections\n- Error rates\n- Resource usage over time\n\nNeeded for:\n- Performance debugging\n- Capacity planning\n- Production monitoring\n- SLA tracking\n\nLocation: internal/rpc/server.go","design":"Add metrics collection to daemon:\n\n1. Request metrics:\n - Total requests by operation\n - Latency histogram\n - Error count by type\n\n2. Cache metrics:\n - Hit/miss ratio\n - Eviction count\n - Current size\n\n3. Connection metrics:\n - Active connections\n - Total connections\n - Rejected connections\n\n4. Resource metrics:\n - Memory usage\n - Goroutine count\n - File descriptor count\n\nAdd metrics endpoint:\n- bd daemon --metrics (JSON output)\n- OpMetrics RPC operation\n- Prometheus-compatible format option\n\nAdd to health check response for free monitoring.","acceptance_criteria":"- Metrics collected for key operations\n- bd daemon --metrics command works\n- Metrics include timestamps\n- Latency percentiles calculated\n- Zero performance overhead\n- Documentation on metrics","status":"open","priority":2,"issue_type":"feature","created_at":"2025-10-18T13:07:19.835495-07:00","updated_at":"2025-10-18T18:35:11.751751-07:00"} {"id":"bd-154","title":"Add log rotation for daemon.log","description":"daemon.log grows forever without rotation. With sync every 5 minutes:\n- ~105k log entries per year\n- No size limit\n- No cleanup\n- Eventually fills disk\n\nNeed automatic log rotation with:\n- Size-based rotation (default: 10MB)\n- Age-based cleanup (default: 7 days)\n- Compression of old logs\n- Configurable retention\n\nLocation: cmd/bd/daemon.go:455","design":"Use lumberjack library for rotation:\n\nimport \"gopkg.in/natefinch/lumberjack.v2\"\n\nlogF := \u0026lumberjack.Logger{\n Filename: logPath,\n MaxSize: 10, // MB\n MaxBackups: 3,\n MaxAge: 7, // days\n Compress: true,\n}\n\nMake configurable via env vars:\n- BEADS_DAEMON_LOG_MAX_SIZE (default: 10MB)\n- BEADS_DAEMON_LOG_MAX_BACKUPS (default: 3)\n- BEADS_DAEMON_LOG_MAX_AGE (default: 7 days)\n\nAdd to daemon status output:\n- Current log size\n- Number of archived logs\n- Oldest log timestamp","acceptance_criteria":"- Log rotation works automatically\n- Old logs are compressed\n- Retention policy enforced\n- Configuration via env vars works\n- Log size stays bounded\n- No log data loss during rotation\n- Documentation updated","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-10-18T13:07:30.94896-07:00","updated_at":"2025-10-18T18:35:11.752336-07:00","closed_at":"2025-10-18T16:27:51.349037-07:00"} {"id":"bd-155","title":"Daemon production readiness","description":"Make beads daemon production-ready for long-running use, multi-repo deployments, and resilient operation.\n\nCurrent state: Good foundation, works well for development\nTarget state: Production-ready for individual developers and small teams\n\nGap areas:\n1. Resource management (cache eviction, limits)\n2. Health monitoring and crash recovery\n3. Process lifecycle management\n4. User experience (visibility, feedback)\n5. Operational concerns (logging, metrics)\n\nSuccess criteria:\n- Can run for weeks without restart\n- Handles 50+ repositories efficiently\n- Recovers from crashes automatically\n- Users understand daemon status\n- Observable and debuggable","acceptance_criteria":"All child issues completed:\n- P0 issues: Storage cache, health checks, crash recovery, MCP cleanup\n- P1 issues: Global auto-start, visibility, version checks\n- P2 issues: Resource limits, telemetry, log rotation\n\nValidation:\n- Run daemon for 7+ days without issues\n- Test with 50+ repositories\n- Verify crash recovery\n- Confirm resource usage is bounded\n- Check metrics and logs are useful","status":"in_progress","priority":0,"issue_type":"epic","created_at":"2025-10-18T13:07:43.543715-07:00","updated_at":"2025-10-18T18:35:11.752924-07:00"} diff --git a/internal/rpc/limits_test.go b/internal/rpc/limits_test.go new file mode 100644 index 00000000..32c61a36 --- /dev/null +++ b/internal/rpc/limits_test.go @@ -0,0 +1,330 @@ +package rpc + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/steveyegge/beads/internal/storage/sqlite" +) + +func TestConnectionLimits(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, ".beads", "test.db") + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + t.Fatal(err) + } + + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + socketPath := filepath.Join(tmpDir, "test.sock") + + // Set low connection limit for testing + os.Setenv("BEADS_DAEMON_MAX_CONNS", "5") + defer os.Unsetenv("BEADS_DAEMON_MAX_CONNS") + + srv := NewServer(socketPath, store) + if srv.maxConns != 5 { + t.Fatalf("expected maxConns=5, got %d", srv.maxConns) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + if err := srv.Start(ctx); err != nil && ctx.Err() == nil { + t.Logf("server error: %v", err) + } + }() + + // Wait for server to be ready + time.Sleep(100 * time.Millisecond) + defer srv.Stop() + + // Open maxConns connections and hold them + var wg sync.WaitGroup + connections := make([]net.Conn, srv.maxConns) + + for i := 0; i < srv.maxConns; i++ { + conn, err := net.Dial("unix", socketPath) + if err != nil { + t.Fatalf("failed to dial connection %d: %v", i, err) + } + connections[i] = conn + + // Send a long-running ping to keep connection busy + wg.Add(1) + go func(c net.Conn, idx int) { + defer wg.Done() + req := Request{ + Operation: OpPing, + } + data, _ := json.Marshal(req) + c.Write(append(data, '\n')) + + // Read response + reader := bufio.NewReader(c) + _, _ = reader.ReadBytes('\n') + }(conn, i) + } + + // Wait for all connections to be active + time.Sleep(200 * time.Millisecond) + + // Verify active connection count + activeConns := atomic.LoadInt32(&srv.activeConns) + if activeConns != int32(srv.maxConns) { + t.Errorf("expected %d active connections, got %d", srv.maxConns, activeConns) + } + + // Try to open one more connection - should be rejected + extraConn, err := net.Dial("unix", socketPath) + if err != nil { + t.Fatalf("failed to dial extra connection: %v", err) + } + defer extraConn.Close() + + // Send request on extra connection + req := Request{Operation: OpPing} + data, _ := json.Marshal(req) + extraConn.Write(append(data, '\n')) + + // Set short read timeout to detect rejection + extraConn.SetReadDeadline(time.Now().Add(500 * time.Millisecond)) + reader := bufio.NewReader(extraConn) + _, err = reader.ReadBytes('\n') + + // Connection should be closed (EOF or timeout) + if err == nil { + t.Error("expected extra connection to be rejected, but got response") + } + + // Close existing connections + for _, conn := range connections { + conn.Close() + } + wg.Wait() + + // Wait for connection cleanup + time.Sleep(100 * time.Millisecond) + + // Now should be able to connect again + newConn, err := net.Dial("unix", socketPath) + if err != nil { + t.Fatalf("failed to reconnect after cleanup: %v", err) + } + defer newConn.Close() + + req = Request{Operation: OpPing} + data, _ = json.Marshal(req) + newConn.Write(append(data, '\n')) + + reader = bufio.NewReader(newConn) + line, err := reader.ReadBytes('\n') + if err != nil { + t.Fatalf("failed to read response: %v", err) + } + + var resp Response + if err := json.Unmarshal(line, &resp); err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + + if !resp.Success { + t.Error("expected successful ping after connection cleanup") + } +} + +func TestRequestTimeout(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, ".beads", "test.db") + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + t.Fatal(err) + } + + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + socketPath := filepath.Join(tmpDir, "test.sock") + + // Set very short timeout for testing + os.Setenv("BEADS_DAEMON_REQUEST_TIMEOUT", "100ms") + defer os.Unsetenv("BEADS_DAEMON_REQUEST_TIMEOUT") + + srv := NewServer(socketPath, store) + if srv.requestTimeout != 100*time.Millisecond { + t.Fatalf("expected timeout=100ms, got %v", srv.requestTimeout) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + if err := srv.Start(ctx); err != nil && ctx.Err() == nil { + t.Logf("server error: %v", err) + } + }() + + time.Sleep(100 * time.Millisecond) + defer srv.Stop() + + conn, err := net.Dial("unix", socketPath) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer conn.Close() + + // Send partial request and wait for timeout + conn.Write([]byte(`{"operation":"ping"`)) // Incomplete JSON + + // Wait longer than timeout + time.Sleep(200 * time.Millisecond) + + // Try to write - connection should be closed due to read timeout + _, err = conn.Write([]byte("}\n")) + if err == nil { + t.Error("expected connection to be closed due to timeout") + } +} + +func TestMemoryPressureDetection(t *testing.T) { + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, ".beads", "test.db") + if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil { + t.Fatal(err) + } + + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + socketPath := filepath.Join(tmpDir, "test.sock") + + // Set very low memory threshold to trigger eviction + os.Setenv("BEADS_DAEMON_MEMORY_THRESHOLD_MB", "1") + defer os.Unsetenv("BEADS_DAEMON_MEMORY_THRESHOLD_MB") + + srv := NewServer(socketPath, store) + + // Add some entries to cache + srv.cacheMu.Lock() + for i := 0; i < 10; i++ { + path := fmt.Sprintf("/test/path/%d", i) + srv.storageCache[path] = &StorageCacheEntry{ + store: store, + lastAccess: time.Now().Add(-time.Duration(i) * time.Minute), + } + } + initialSize := len(srv.storageCache) + srv.cacheMu.Unlock() + + // Trigger memory pressure check (should evict entries) + srv.checkMemoryPressure() + + // Check that some entries were evicted + srv.cacheMu.RLock() + finalSize := len(srv.storageCache) + srv.cacheMu.RUnlock() + + if finalSize >= initialSize { + t.Errorf("expected cache eviction, but size went from %d to %d", initialSize, finalSize) + } + + t.Logf("Cache evicted: %d -> %d entries", initialSize, finalSize) +} + +func TestHealthResponseIncludesLimits(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "bd-limits-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + dbPath := filepath.Join(tmpDir, "test.db") + socketPath := filepath.Join(tmpDir, "test.sock") + + store, err := sqlite.New(dbPath) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + os.Setenv("BEADS_DAEMON_MAX_CONNS", "50") + defer os.Unsetenv("BEADS_DAEMON_MAX_CONNS") + + srv := NewServer(socketPath, store) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + if err := srv.Start(ctx); err != nil && ctx.Err() == nil { + t.Logf("server error: %v", err) + } + }() + + time.Sleep(100 * time.Millisecond) + defer srv.Stop() + + conn, err := net.Dial("unix", socketPath) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer conn.Close() + + req := Request{Operation: OpHealth} + data, _ := json.Marshal(req) + conn.Write(append(data, '\n')) + + reader := bufio.NewReader(conn) + line, err := reader.ReadBytes('\n') + if err != nil { + t.Fatalf("failed to read response: %v", err) + } + + var resp Response + if err := json.Unmarshal(line, &resp); err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + + if !resp.Success { + t.Fatalf("health check failed: %s", resp.Error) + } + + var health HealthResponse + if err := json.Unmarshal(resp.Data, &health); err != nil { + t.Fatalf("failed to unmarshal health response: %v", err) + } + + // Verify limit fields are present + if health.MaxConns != 50 { + t.Errorf("expected MaxConns=50, got %d", health.MaxConns) + } + + if health.ActiveConns < 0 { + t.Errorf("expected ActiveConns>=0, got %d", health.ActiveConns) + } + + if health.MemoryAllocMB == 0 { + t.Error("expected MemoryAllocMB>0") + } + + t.Logf("Health: %d/%d connections, %d MB memory", health.ActiveConns, health.MaxConns, health.MemoryAllocMB) +} diff --git a/internal/rpc/protocol.go b/internal/rpc/protocol.go index 6af1e03e..8e988f38 100644 --- a/internal/rpc/protocol.go +++ b/internal/rpc/protocol.go @@ -150,6 +150,9 @@ type HealthResponse struct { CacheHits int64 `json:"cache_hits"` CacheMisses int64 `json:"cache_misses"` DBResponseTime float64 `json:"db_response_ms"` + ActiveConns int32 `json:"active_connections"` + MaxConns int `json:"max_connections"` + MemoryAllocMB uint64 `json:"memory_alloc_mb"` Error string `json:"error,omitempty"` } diff --git a/internal/rpc/server.go b/internal/rpc/server.go index f125a2b0..a6e6d485 100644 --- a/internal/rpc/server.go +++ b/internal/rpc/server.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "path/filepath" + "runtime" "sort" "strings" "sync" @@ -52,6 +53,12 @@ type Server struct { startTime time.Time cacheHits int64 cacheMisses int64 + // Connection limiting + maxConns int + activeConns int32 // atomic counter + connSemaphore chan struct{} + // Request timeout + requestTimeout time.Duration } // NewServer creates a new RPC server @@ -73,14 +80,32 @@ func NewServer(socketPath string, store storage.Storage) *Server { } } + maxConns := 100 // default + if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" { + var conns int + if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 { + maxConns = conns + } + } + + requestTimeout := 30 * time.Second // default + if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" { + if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 { + requestTimeout = timeout + } + } + return &Server{ - socketPath: socketPath, - storage: store, - storageCache: make(map[string]*StorageCacheEntry), - maxCacheSize: maxCacheSize, - cacheTTL: cacheTTL, - shutdownChan: make(chan struct{}), - startTime: time.Now(), + socketPath: socketPath, + storage: store, + storageCache: make(map[string]*StorageCacheEntry), + maxCacheSize: maxCacheSize, + cacheTTL: cacheTTL, + shutdownChan: make(chan struct{}), + startTime: time.Now(), + maxConns: maxConns, + connSemaphore: make(chan struct{}, maxConns), + requestTimeout: requestTimeout, } } @@ -131,7 +156,20 @@ func (s *Server) Start(ctx context.Context) error { return fmt.Errorf("failed to accept connection: %w", err) } - go s.handleConnection(conn) + // Try to acquire connection slot (non-blocking) + select { + case s.connSemaphore <- struct{}{}: + // Acquired slot, handle connection + go func(c net.Conn) { + defer func() { <-s.connSemaphore }() // Release slot + atomic.AddInt32(&s.activeConns, 1) + defer atomic.AddInt32(&s.activeConns, -1) + s.handleConnection(c) + }(conn) + default: + // Max connections reached, reject immediately + conn.Close() + } } } @@ -218,7 +256,7 @@ func (s *Server) handleSignals() { s.Stop() } -// runCleanupLoop periodically evicts stale storage connections +// runCleanupLoop periodically evicts stale storage connections and checks memory pressure func (s *Server) runCleanupLoop() { s.cleanupTicker = time.NewTicker(5 * time.Minute) defer s.cleanupTicker.Stop() @@ -226,6 +264,7 @@ func (s *Server) runCleanupLoop() { for { select { case <-s.cleanupTicker.C: + s.checkMemoryPressure() s.evictStaleStorage() case <-s.shutdownChan: return @@ -233,6 +272,71 @@ func (s *Server) runCleanupLoop() { } } +// checkMemoryPressure monitors memory usage and triggers aggressive eviction if needed +func (s *Server) checkMemoryPressure() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + + // Memory thresholds (configurable via env var) + const defaultThresholdMB = 500 + thresholdMB := defaultThresholdMB + if env := os.Getenv("BEADS_DAEMON_MEMORY_THRESHOLD_MB"); env != "" { + var threshold int + if _, err := fmt.Sscanf(env, "%d", &threshold); err == nil && threshold > 0 { + thresholdMB = threshold + } + } + + allocMB := m.Alloc / 1024 / 1024 + if allocMB > uint64(thresholdMB) { + fmt.Fprintf(os.Stderr, "Warning: High memory usage detected (%d MB), triggering aggressive cache eviction\n", allocMB) + s.aggressiveEviction() + runtime.GC() // Suggest garbage collection + } +} + +// aggressiveEviction evicts 50% of cached storage to reduce memory pressure +func (s *Server) aggressiveEviction() { + toClose := []storage.Storage{} + + s.cacheMu.Lock() + + if len(s.storageCache) == 0 { + s.cacheMu.Unlock() + return + } + + // Build sorted list by last access + type cacheItem struct { + path string + entry *StorageCacheEntry + } + items := make([]cacheItem, 0, len(s.storageCache)) + for path, entry := range s.storageCache { + items = append(items, cacheItem{path, entry}) + } + + sort.Slice(items, func(i, j int) bool { + return items[i].entry.lastAccess.Before(items[j].entry.lastAccess) + }) + + // Evict oldest 50% + numToEvict := len(items) / 2 + for i := 0; i < numToEvict; i++ { + toClose = append(toClose, items[i].entry.store) + delete(s.storageCache, items[i].path) + } + + s.cacheMu.Unlock() + + // Close without holding lock + for _, store := range toClose { + if err := store.Close(); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err) + } + } +} + // evictStaleStorage removes idle connections and enforces cache size limits func (s *Server) evictStaleStorage() { now := time.Now() @@ -291,6 +395,11 @@ func (s *Server) handleConnection(conn net.Conn) { writer := bufio.NewWriter(conn) for { + // Set read deadline for the next request + if err := conn.SetReadDeadline(time.Now().Add(s.requestTimeout)); err != nil { + return + } + line, err := reader.ReadBytes('\n') if err != nil { return @@ -306,6 +415,11 @@ func (s *Server) handleConnection(conn net.Conn) { continue } + // Set write deadline for the response + if err := conn.SetWriteDeadline(time.Now().Add(s.requestTimeout)); err != nil { + return + } + resp := s.handleRequest(&req) s.writeResponse(writer, resp) } @@ -488,6 +602,10 @@ func (s *Server) handlePing(_ *Request) Response { func (s *Server) handleHealth(req *Request) Response { start := time.Now() + // Get memory stats for health response + var m runtime.MemStats + runtime.ReadMemStats(&m) + store, err := s.getStorageForRequest(req) if err != nil { data, _ := json.Marshal(HealthResponse{ @@ -541,6 +659,9 @@ func (s *Server) handleHealth(req *Request) Response { CacheHits: atomic.LoadInt64(&s.cacheHits), CacheMisses: atomic.LoadInt64(&s.cacheMisses), DBResponseTime: dbResponseMs, + ActiveConns: atomic.LoadInt32(&s.activeConns), + MaxConns: s.maxConns, + MemoryAllocMB: m.Alloc / 1024 / 1024, } if dbError != "" {