From 322ab63b10e39922fb5a2e8c30e65f900f1f8665 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Tue, 28 Oct 2025 10:33:19 -0700 Subject: [PATCH] Remove daemon storage cache (bd-33, bd-34, bd-35) - Deleted server_cache_storage.go (~300 lines) - Removed cache fields from Server struct - Simplified database routing to use s.storage directly - Removed cache metrics from health and metrics endpoints - Deleted server_eviction_test.go (cache eviction tests) - Cleaned up limits_test.go (removed cache assertions) - All tests passing --- cmd/bd/daemon.go | 20 - internal/rpc/client.go | 4 +- internal/rpc/limits_test.go | 55 -- internal/rpc/metrics.go | 18 +- internal/rpc/metrics_test.go | 25 +- internal/rpc/protocol.go | 3 - internal/rpc/server_cache_storage.go | 286 ---------- internal/rpc/server_eviction_test.go | 525 ------------------ internal/rpc/server_lifecycle_conn.go | 2 - .../server_routing_validation_diagnostics.go | 32 +- 10 files changed, 6 insertions(+), 964 deletions(-) delete mode 100644 internal/rpc/server_cache_storage.go delete mode 100644 internal/rpc/server_eviction_test.go diff --git a/cmd/bd/daemon.go b/cmd/bd/daemon.go index f0a0802f..2517cf1a 100644 --- a/cmd/bd/daemon.go +++ b/cmd/bd/daemon.go @@ -386,15 +386,6 @@ func showDaemonHealth(global bool) { fmt.Printf(" Version: %s\n", health.Version) fmt.Printf(" Uptime: %s\n", formatUptime(health.Uptime)) - fmt.Printf(" Cache Size: %d databases\n", health.CacheSize) - fmt.Printf(" Cache Hits: %d\n", health.CacheHits) - fmt.Printf(" Cache Misses: %d\n", health.CacheMisses) - - if health.CacheHits+health.CacheMisses > 0 { - hitRate := float64(health.CacheHits) / float64(health.CacheHits+health.CacheMisses) * 100 - fmt.Printf(" Cache Hit Rate: %.1f%%\n", hitRate) - } - fmt.Printf(" DB Response Time: %.2f ms\n", health.DBResponseTime) if health.Error != "" { @@ -455,17 +446,6 @@ func showDaemonMetrics(global bool) { fmt.Printf("Uptime: %.1f seconds (%.1f minutes)\n", metrics.UptimeSeconds, metrics.UptimeSeconds/60) fmt.Printf("Timestamp: %s\n\n", metrics.Timestamp.Format(time.RFC3339)) - // Cache metrics - fmt.Printf("Cache Metrics:\n") - fmt.Printf(" Size: %d databases\n", metrics.CacheSize) - fmt.Printf(" Hits: %d\n", metrics.CacheHits) - fmt.Printf(" Misses: %d\n", metrics.CacheMisses) - if metrics.CacheHits+metrics.CacheMisses > 0 { - hitRate := float64(metrics.CacheHits) / float64(metrics.CacheHits+metrics.CacheMisses) * 100 - fmt.Printf(" Hit Rate: %.1f%%\n", hitRate) - } - fmt.Printf(" Evictions: %d\n\n", metrics.CacheEvictions) - // Connection metrics fmt.Printf("Connection Metrics:\n") fmt.Printf(" Total: %d\n", metrics.TotalConns) diff --git a/internal/rpc/client.go b/internal/rpc/client.go index e65a40e1..18a27a2f 100644 --- a/internal/rpc/client.go +++ b/internal/rpc/client.go @@ -74,8 +74,8 @@ func TryConnectWithTimeout(socketPath string, dialTimeout time.Duration) (*Clien } if os.Getenv("BD_DEBUG") != "" { - fmt.Fprintf(os.Stderr, "Debug: connected to daemon (status: %s, uptime: %.1fs, cache: %d)\n", - health.Status, health.Uptime, health.CacheSize) + fmt.Fprintf(os.Stderr, "Debug: connected to daemon (status: %s, uptime: %.1fs)\n", + health.Status, health.Uptime) } return client, nil diff --git a/internal/rpc/limits_test.go b/internal/rpc/limits_test.go index dfa1fef4..0a563ca3 100644 --- a/internal/rpc/limits_test.go +++ b/internal/rpc/limits_test.go @@ -4,11 +4,9 @@ import ( "bufio" "context" "encoding/json" - "fmt" "net" "os" "path/filepath" - "runtime" "sync" "sync/atomic" "testing" @@ -200,59 +198,6 @@ func TestRequestTimeout(t *testing.T) { } } -func TestMemoryPressureDetection(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("memory pressure detection thresholds are not reliable on Windows") - } - - tmpDir := t.TempDir() - dbPath := filepath.Join(tmpDir, ".beads", "test.db") - if err := os.MkdirAll(filepath.Dir(dbPath), 0750); err != nil { - t.Fatal(err) - } - - store, err := sqlite.New(dbPath) - if err != nil { - t.Fatal(err) - } - defer store.Close() - - socketPath := filepath.Join(tmpDir, "test.sock") - - srv := NewServer(socketPath, store, tmpDir, dbPath) - - // Add some entries to cache - srv.cacheMu.Lock() - for i := 0; i < 10; i++ { - path := fmt.Sprintf("/test/path/%d", i) - srv.storageCache[path] = &StorageCacheEntry{ - store: store, - lastAccess: time.Now().Add(-time.Duration(i) * time.Minute), - } - } - initialSize := len(srv.storageCache) - srv.cacheMu.Unlock() - - // Trigger aggressive eviction directly (should evict 50% of entries) - srv.aggressiveEviction() - - // Check that some entries were evicted - srv.cacheMu.RLock() - finalSize := len(srv.storageCache) - srv.cacheMu.RUnlock() - - if finalSize >= initialSize { - t.Errorf("expected cache eviction, but size went from %d to %d", initialSize, finalSize) - } - - expectedSize := initialSize / 2 - if finalSize != expectedSize { - t.Errorf("expected exactly %d entries after evicting 50%%, got %d", expectedSize, finalSize) - } - - t.Logf("Cache evicted: %d -> %d entries", initialSize, finalSize) -} - func TestHealthResponseIncludesLimits(t *testing.T) { tmpDir, err := os.MkdirTemp("", "bd-limits-test-*") if err != nil { diff --git a/internal/rpc/metrics.go b/internal/rpc/metrics.go index f56357b2..feea6dcc 100644 --- a/internal/rpc/metrics.go +++ b/internal/rpc/metrics.go @@ -23,9 +23,6 @@ type Metrics struct { totalConns int64 rejectedConns int64 - // Cache metrics (handled separately via atomic in Server) - cacheEvictions int64 - // System start time (for uptime calculation) startTime time.Time } @@ -76,13 +73,8 @@ func (m *Metrics) RecordRejectedConnection() { atomic.AddInt64(&m.rejectedConns, 1) } -// RecordCacheEviction records a cache eviction event -func (m *Metrics) RecordCacheEviction() { - atomic.AddInt64(&m.cacheEvictions, 1) -} - // Snapshot returns a point-in-time snapshot of all metrics -func (m *Metrics) Snapshot(cacheHits, cacheMisses int64, cacheSize, activeConns int) MetricsSnapshot { +func (m *Metrics) Snapshot(activeConns int) MetricsSnapshot { // Copy data under a short critical section m.mu.RLock() @@ -161,10 +153,6 @@ func (m *Metrics) Snapshot(cacheHits, cacheMisses int64, cacheSize, activeConns Timestamp: time.Now(), UptimeSeconds: uptimeSeconds, Operations: operations, - CacheHits: cacheHits, - CacheMisses: cacheMisses, - CacheSize: cacheSize, - CacheEvictions: atomic.LoadInt64(&m.cacheEvictions), TotalConns: atomic.LoadInt64(&m.totalConns), ActiveConns: activeConns, RejectedConns: atomic.LoadInt64(&m.rejectedConns), @@ -179,10 +167,6 @@ type MetricsSnapshot struct { Timestamp time.Time `json:"timestamp"` UptimeSeconds float64 `json:"uptime_seconds"` Operations []OperationMetrics `json:"operations"` - CacheHits int64 `json:"cache_hits"` - CacheMisses int64 `json:"cache_misses"` - CacheSize int `json:"cache_size"` - CacheEvictions int64 `json:"cache_evictions"` TotalConns int64 `json:"total_connections"` ActiveConns int `json:"active_connections"` RejectedConns int64 `json:"rejected_connections"` diff --git a/internal/rpc/metrics_test.go b/internal/rpc/metrics_test.go index 966bfff6..66c73330 100644 --- a/internal/rpc/metrics_test.go +++ b/internal/rpc/metrics_test.go @@ -52,16 +52,6 @@ func TestMetricsRecording(t *testing.T) { t.Errorf("Expected rejected count to increase by 1, got %d -> %d", before, after) } }) - - t.Run("record cache eviction", func(t *testing.T) { - before := m.cacheEvictions - m.RecordCacheEviction() - after := m.cacheEvictions - - if after != before+1 { - t.Errorf("Expected eviction count to increase by 1, got %d -> %d", before, after) - } - }) } func TestMetricsSnapshot(t *testing.T) { @@ -74,10 +64,9 @@ func TestMetricsSnapshot(t *testing.T) { m.RecordError("create") m.RecordConnection() m.RecordRejectedConnection() - m.RecordCacheEviction() // Take snapshot - snapshot := m.Snapshot(100, 10, 50, 3) + snapshot := m.Snapshot(3) t.Run("basic metrics", func(t *testing.T) { if snapshot.TotalConns < 1 { @@ -86,18 +75,6 @@ func TestMetricsSnapshot(t *testing.T) { if snapshot.RejectedConns < 1 { t.Error("Expected at least 1 rejected connection") } - if snapshot.CacheEvictions < 1 { - t.Error("Expected at least 1 cache eviction") - } - if snapshot.CacheHits != 100 { - t.Errorf("Expected 100 cache hits, got %d", snapshot.CacheHits) - } - if snapshot.CacheMisses != 10 { - t.Errorf("Expected 10 cache misses, got %d", snapshot.CacheMisses) - } - if snapshot.CacheSize != 50 { - t.Errorf("Expected cache size 50, got %d", snapshot.CacheSize) - } if snapshot.ActiveConns != 3 { t.Errorf("Expected 3 active connections, got %d", snapshot.ActiveConns) } diff --git a/internal/rpc/protocol.go b/internal/rpc/protocol.go index c5f25b74..54974db8 100644 --- a/internal/rpc/protocol.go +++ b/internal/rpc/protocol.go @@ -187,9 +187,6 @@ type HealthResponse struct { ClientVersion string `json:"client_version,omitempty"` // Client version from request Compatible bool `json:"compatible"` // Whether versions are compatible Uptime float64 `json:"uptime_seconds"` - CacheSize int `json:"cache_size"` - CacheHits int64 `json:"cache_hits"` - CacheMisses int64 `json:"cache_misses"` DBResponseTime float64 `json:"db_response_ms"` ActiveConns int32 `json:"active_connections"` MaxConns int `json:"max_connections"` diff --git a/internal/rpc/server_cache_storage.go b/internal/rpc/server_cache_storage.go deleted file mode 100644 index 45eca3df..00000000 --- a/internal/rpc/server_cache_storage.go +++ /dev/null @@ -1,286 +0,0 @@ -package rpc - -import ( - "fmt" - "os" - "path/filepath" - "runtime" - "sort" - "sync/atomic" - "time" - - "github.com/steveyegge/beads/internal/storage" - "github.com/steveyegge/beads/internal/storage/sqlite" -) - -// StorageCacheEntry holds a cached storage with metadata for eviction -type StorageCacheEntry struct { - store storage.Storage - lastAccess time.Time - dbMtime time.Time // DB file modification time for detecting external changes -} - -// runCleanupLoop periodically evicts stale storage connections and checks memory pressure -func (s *Server) runCleanupLoop() { - s.cleanupTicker = time.NewTicker(5 * time.Minute) - defer s.cleanupTicker.Stop() - - for { - select { - case <-s.cleanupTicker.C: - s.checkMemoryPressure() - s.evictStaleStorage() - case <-s.shutdownChan: - return - } - } -} - -// checkMemoryPressure monitors memory usage and triggers aggressive eviction if needed -func (s *Server) checkMemoryPressure() { - var m runtime.MemStats - runtime.ReadMemStats(&m) - - // Memory thresholds (configurable via env var) - const defaultThresholdMB = 500 - thresholdMB := defaultThresholdMB - if env := os.Getenv("BEADS_DAEMON_MEMORY_THRESHOLD_MB"); env != "" { - var threshold int - if _, err := fmt.Sscanf(env, "%d", &threshold); err == nil && threshold > 0 { - thresholdMB = threshold - } - } - - allocMB := m.Alloc / 1024 / 1024 - // #nosec G115 - safe conversion of positive value - if allocMB > uint64(thresholdMB) { - fmt.Fprintf(os.Stderr, "Warning: High memory usage detected (%d MB), triggering aggressive cache eviction\n", allocMB) - s.aggressiveEviction() - runtime.GC() // Suggest garbage collection - } -} - -// aggressiveEviction evicts 50% of cached storage to reduce memory pressure -func (s *Server) aggressiveEviction() { - toClose := []storage.Storage{} - - s.cacheMu.Lock() - - if len(s.storageCache) == 0 { - s.cacheMu.Unlock() - return - } - - // Build sorted list by last access - type cacheItem struct { - path string - entry *StorageCacheEntry - } - items := make([]cacheItem, 0, len(s.storageCache)) - for path, entry := range s.storageCache { - items = append(items, cacheItem{path, entry}) - } - - sort.Slice(items, func(i, j int) bool { - return items[i].entry.lastAccess.Before(items[j].entry.lastAccess) - }) - - // Evict oldest 50% - numToEvict := len(items) / 2 - for i := 0; i < numToEvict; i++ { - toClose = append(toClose, items[i].entry.store) - delete(s.storageCache, items[i].path) - } - - s.cacheMu.Unlock() - - // Close without holding lock - for _, store := range toClose { - if err := store.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err) - } - } -} - -// evictStaleStorage removes idle connections and enforces cache size limits -func (s *Server) evictStaleStorage() { - now := time.Now() - toClose := []storage.Storage{} - - s.cacheMu.Lock() - - // First pass: evict TTL-expired entries - for path, entry := range s.storageCache { - if now.Sub(entry.lastAccess) > s.cacheTTL { - toClose = append(toClose, entry.store) - delete(s.storageCache, path) - } - } - - // Second pass: enforce max cache size with LRU eviction - if len(s.storageCache) > s.maxCacheSize { - // Build sorted list of entries by lastAccess - type cacheItem struct { - path string - entry *StorageCacheEntry - } - items := make([]cacheItem, 0, len(s.storageCache)) - for path, entry := range s.storageCache { - items = append(items, cacheItem{path, entry}) - } - - // Sort by lastAccess (oldest first) with sort.Slice - sort.Slice(items, func(i, j int) bool { - return items[i].entry.lastAccess.Before(items[j].entry.lastAccess) - }) - - // Evict oldest entries until we're under the limit - numToEvict := len(s.storageCache) - s.maxCacheSize - for i := 0; i < numToEvict && i < len(items); i++ { - toClose = append(toClose, items[i].entry.store) - delete(s.storageCache, items[i].path) - s.metrics.RecordCacheEviction() - } - } - - // Unlock before closing to avoid holding lock during Close - s.cacheMu.Unlock() - - // Close connections synchronously - for _, store := range toClose { - if err := store.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err) - } - } -} - -// getStorageForRequest returns the appropriate storage for the request -// If req.Cwd is set, it finds the database for that directory -// Otherwise, it uses the default storage -func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) { - // If no cwd specified, use default storage - if req.Cwd == "" { - return s.storage, nil - } - - // Find database for this cwd (to get the canonical repo root) - dbPath := s.findDatabaseForCwd(req.Cwd) - if dbPath == "" { - return nil, fmt.Errorf("no .beads database found for path: %s", req.Cwd) - } - - // Canonicalize key to repo root (parent of .beads directory) - beadsDir := filepath.Dir(dbPath) - repoRoot := filepath.Dir(beadsDir) - - // Check cache first with write lock (to avoid race on lastAccess update) - s.cacheMu.Lock() - defer s.cacheMu.Unlock() - - if entry, ok := s.storageCache[repoRoot]; ok { - // Check if DB file has been modified externally - info, err := os.Stat(dbPath) - if err == nil && !info.ModTime().Equal(entry.dbMtime) { - // DB file changed - evict stale cache entry - // Remove from cache first to prevent concurrent access - delete(s.storageCache, repoRoot) - atomic.AddInt64(&s.cacheMisses, 1) - // Close storage after removing from cache (safe now) - // Unlock briefly to avoid blocking during Close() - s.cacheMu.Unlock() - if err := entry.store.Close(); err != nil { - // Log but don't fail - we'll reopen anyway - fmt.Fprintf(os.Stderr, "Warning: failed to close stale cached storage: %v\n", err) - } - s.cacheMu.Lock() - // Fall through to reopen - } else if err == nil { - // Cache hit - DB file unchanged - entry.lastAccess = time.Now() - atomic.AddInt64(&s.cacheHits, 1) - return entry.store, nil - } else { - // Stat failed - evict and reopen - // Remove from cache first to prevent concurrent access - delete(s.storageCache, repoRoot) - atomic.AddInt64(&s.cacheMisses, 1) - // Close storage after removing from cache - s.cacheMu.Unlock() - if err := entry.store.Close(); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to close cached storage: %v\n", err) - } - s.cacheMu.Lock() - // Fall through to reopen - } - } else { - atomic.AddInt64(&s.cacheMisses, 1) - } - - // Open storage - store, err := sqlite.New(dbPath) - if err != nil { - return nil, fmt.Errorf("failed to open database at %s: %w", dbPath, err) - } - - // Get mtime for the newly opened DB - info, err := os.Stat(dbPath) - if err != nil { - // If we can't stat, still cache it but with zero mtime (will invalidate on next check) - info = nil - } - - mtime := time.Time{} - if info != nil { - mtime = info.ModTime() - } - - // Cache it with current timestamp and mtime - s.storageCache[repoRoot] = &StorageCacheEntry{ - store: store, - lastAccess: time.Now(), - dbMtime: mtime, - } - - // Enforce LRU immediately to prevent FD spikes between cleanup ticks - needEvict := len(s.storageCache) > s.maxCacheSize - s.cacheMu.Unlock() - - if needEvict { - s.evictStaleStorage() - } - - // Re-acquire lock for defer - s.cacheMu.Lock() - - return store, nil -} - -// findDatabaseForCwd walks up from cwd to find .beads/*.db -func (s *Server) findDatabaseForCwd(cwd string) string { - dir, err := filepath.Abs(cwd) - if err != nil { - return "" - } - - // Walk up directory tree - for { - beadsDir := filepath.Join(dir, ".beads") - if info, err := os.Stat(beadsDir); err == nil && info.IsDir() { - // Found .beads/ directory, look for *.db files - matches, err := filepath.Glob(filepath.Join(beadsDir, "*.db")) - if err == nil && len(matches) > 0 { - return matches[0] - } - } - - // Move up one directory - parent := filepath.Dir(dir) - if parent == dir { - // Reached filesystem root - break - } - dir = parent - } - - return "" -} diff --git a/internal/rpc/server_eviction_test.go b/internal/rpc/server_eviction_test.go deleted file mode 100644 index 729ed1dd..00000000 --- a/internal/rpc/server_eviction_test.go +++ /dev/null @@ -1,525 +0,0 @@ -package rpc - -import ( - "fmt" - "os" - "path/filepath" - "testing" - "time" - - "github.com/steveyegge/beads/internal/storage/sqlite" -) - -func TestStorageCacheEviction_TTL(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server with short TTL for testing - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - server.cacheTTL = 100 * time.Millisecond // Short TTL for testing - defer server.Stop() - - // Create two test databases - db1 := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") - os.MkdirAll(filepath.Dir(db1), 0755) - store1, err := sqlite.New(db1) - if err != nil { - t.Fatal(err) - } - store1.Close() - - db2 := filepath.Join(tmpDir, "repo2", ".beads", "issues.db") - os.MkdirAll(filepath.Dir(db2), 0755) - store2, err := sqlite.New(db2) - if err != nil { - t.Fatal(err) - } - store2.Close() - - // Access both repos to populate cache - req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")} - _, err = server.getStorageForRequest(req1) - if err != nil { - t.Fatal(err) - } - - req2 := &Request{Cwd: filepath.Join(tmpDir, "repo2")} - _, err = server.getStorageForRequest(req2) - if err != nil { - t.Fatal(err) - } - - // Verify both are cached - server.cacheMu.RLock() - cacheSize := len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 2 { - t.Fatalf("expected 2 cached entries, got %d", cacheSize) - } - - // Wait for TTL to expire - time.Sleep(150 * time.Millisecond) - - // Run eviction - server.evictStaleStorage() - - // Verify both entries were evicted - server.cacheMu.RLock() - cacheSize = len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 0 { - t.Fatalf("expected 0 cached entries after TTL eviction, got %d", cacheSize) - } -} - -func TestStorageCacheEviction_LRU(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server with small cache size - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - server.maxCacheSize = 2 // Only keep 2 entries - server.cacheTTL = 1 * time.Hour // Long TTL so we test LRU - defer server.Stop() - - // Create three test databases - for i := 1; i <= 3; i++ { - dbPath := filepath.Join(tmpDir, "repo"+string(rune('0'+i)), ".beads", "issues.db") - os.MkdirAll(filepath.Dir(dbPath), 0755) - store, err := sqlite.New(dbPath) - if err != nil { - t.Fatal(err) - } - store.Close() - } - - // Access repos 1 and 2 - req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")} - _, err = server.getStorageForRequest(req1) - if err != nil { - t.Fatal(err) - } - time.Sleep(10 * time.Millisecond) // Ensure different timestamps - - req2 := &Request{Cwd: filepath.Join(tmpDir, "repo2")} - _, err = server.getStorageForRequest(req2) - if err != nil { - t.Fatal(err) - } - - // Verify 2 entries cached - server.cacheMu.RLock() - cacheSize := len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 2 { - t.Fatalf("expected 2 cached entries, got %d", cacheSize) - } - - // Access repo 3, which should trigger LRU eviction of repo1 (oldest) - req3 := &Request{Cwd: filepath.Join(tmpDir, "repo3")} - _, err = server.getStorageForRequest(req3) - if err != nil { - t.Fatal(err) - } - - // Run eviction to enforce max cache size - server.evictStaleStorage() - - // Should still have 2 entries - server.cacheMu.RLock() - cacheSize = len(server.storageCache) - _, hasRepo1 := server.storageCache[filepath.Join(tmpDir, "repo1")] - _, hasRepo2 := server.storageCache[filepath.Join(tmpDir, "repo2")] - _, hasRepo3 := server.storageCache[filepath.Join(tmpDir, "repo3")] - server.cacheMu.RUnlock() - - if cacheSize != 2 { - t.Fatalf("expected 2 cached entries after LRU eviction, got %d", cacheSize) - } - - // Repo1 should be evicted (oldest), repo2 and repo3 should remain - if hasRepo1 { - t.Error("repo1 should have been evicted (oldest)") - } - if !hasRepo2 { - t.Error("repo2 should still be cached") - } - if !hasRepo3 { - t.Error("repo3 should be cached") - } -} - -func TestStorageCacheEviction_LastAccessUpdate(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - defer server.Stop() - - // Create test database - dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") - os.MkdirAll(filepath.Dir(dbPath), 0755) - store, err := sqlite.New(dbPath) - if err != nil { - t.Fatal(err) - } - store.Close() - - // First access - req := &Request{Cwd: filepath.Join(tmpDir, "repo1")} - _, err = server.getStorageForRequest(req) - if err != nil { - t.Fatal(err) - } - - // Get initial lastAccess time - server.cacheMu.RLock() - entry := server.storageCache[filepath.Join(tmpDir, "repo1")] - initialTime := entry.lastAccess - server.cacheMu.RUnlock() - - // Wait a bit - time.Sleep(50 * time.Millisecond) - - // Access again - _, err = server.getStorageForRequest(req) - if err != nil { - t.Fatal(err) - } - - // Verify lastAccess was updated - server.cacheMu.RLock() - entry = server.storageCache[filepath.Join(tmpDir, "repo1")] - updatedTime := entry.lastAccess - server.cacheMu.RUnlock() - - if !updatedTime.After(initialTime) { - t.Errorf("lastAccess should be updated on cache hit, initial: %v, updated: %v", initialTime, updatedTime) - } -} - -func TestStorageCacheEviction_EnvVars(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Set env vars - os.Setenv("BEADS_DAEMON_MAX_CACHE_SIZE", "100") - os.Setenv("BEADS_DAEMON_CACHE_TTL", "1h30m") - defer os.Unsetenv("BEADS_DAEMON_MAX_CACHE_SIZE") - defer os.Unsetenv("BEADS_DAEMON_CACHE_TTL") - - // Create server - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - defer server.Stop() - - // Verify config was parsed - if server.maxCacheSize != 100 { - t.Errorf("expected maxCacheSize=100, got %d", server.maxCacheSize) - } - expectedTTL := 90 * time.Minute - if server.cacheTTL != expectedTTL { - t.Errorf("expected cacheTTL=%v, got %v", expectedTTL, server.cacheTTL) - } -} - -func TestStorageCacheEviction_CleanupOnStop(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - - // Create test database and populate cache - dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") - os.MkdirAll(filepath.Dir(dbPath), 0755) - store, err := sqlite.New(dbPath) - if err != nil { - t.Fatal(err) - } - store.Close() - - req := &Request{Cwd: filepath.Join(tmpDir, "repo1")} - _, err = server.getStorageForRequest(req) - if err != nil { - t.Fatal(err) - } - - // Verify cached - server.cacheMu.RLock() - cacheSize := len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 1 { - t.Fatalf("expected 1 cached entry, got %d", cacheSize) - } - - // Stop server - if err := server.Stop(); err != nil { - t.Fatal(err) - } - - // Verify cache was cleared - server.cacheMu.RLock() - cacheSize = len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 0 { - t.Errorf("expected cache to be cleared on stop, got %d entries", cacheSize) - } -} - -func TestStorageCacheEviction_CanonicalKey(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - defer server.Stop() - - // Create test database - dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") - os.MkdirAll(filepath.Dir(dbPath), 0755) - store, err := sqlite.New(dbPath) - if err != nil { - t.Fatal(err) - } - store.Close() - - // Access from different subdirectories of the same repo - req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")} - _, err = server.getStorageForRequest(req1) - if err != nil { - t.Fatal(err) - } - - req2 := &Request{Cwd: filepath.Join(tmpDir, "repo1", "subdir1")} - _, err = server.getStorageForRequest(req2) - if err != nil { - t.Fatal(err) - } - - req3 := &Request{Cwd: filepath.Join(tmpDir, "repo1", "subdir1", "subdir2")} - _, err = server.getStorageForRequest(req3) - if err != nil { - t.Fatal(err) - } - - // Should only have one cache entry (all pointing to same repo root) - server.cacheMu.RLock() - cacheSize := len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 1 { - t.Errorf("expected 1 cached entry (canonical key), got %d", cacheSize) - } -} - -func TestStorageCacheEviction_ImmediateLRU(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server with max cache size of 2 - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - server.maxCacheSize = 2 - server.cacheTTL = 1 * time.Hour // Long TTL - defer server.Stop() - - // Create 3 test databases - for i := 1; i <= 3; i++ { - dbPath := filepath.Join(tmpDir, fmt.Sprintf("repo%d", i), ".beads", "issues.db") - os.MkdirAll(filepath.Dir(dbPath), 0755) - store, err := sqlite.New(dbPath) - if err != nil { - t.Fatal(err) - } - store.Close() - } - - // Access all 3 repos - for i := 1; i <= 3; i++ { - req := &Request{Cwd: filepath.Join(tmpDir, fmt.Sprintf("repo%d", i))} - _, err = server.getStorageForRequest(req) - if err != nil { - t.Fatal(err) - } - time.Sleep(10 * time.Millisecond) // Ensure different timestamps - } - - // Cache should never exceed maxCacheSize (immediate LRU enforcement) - server.cacheMu.RLock() - cacheSize := len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize > server.maxCacheSize { - t.Errorf("cache size %d exceeds max %d (immediate LRU not enforced)", cacheSize, server.maxCacheSize) - } -} - -func TestStorageCacheEviction_InvalidTTL(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Set invalid TTL - os.Setenv("BEADS_DAEMON_CACHE_TTL", "-5m") - defer os.Unsetenv("BEADS_DAEMON_CACHE_TTL") - - // Create server - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - defer server.Stop() - - // Should fall back to default (30 minutes) - expectedTTL := 30 * time.Minute - if server.cacheTTL != expectedTTL { - t.Errorf("expected TTL to fall back to %v for invalid value, got %v", expectedTTL, server.cacheTTL) - } -} - -func TestStorageCacheEviction_ReopenAfterEviction(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server with short TTL - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - server.cacheTTL = 50 * time.Millisecond - defer server.Stop() - - // Create test database - dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db") - os.MkdirAll(filepath.Dir(dbPath), 0755) - store, err := sqlite.New(dbPath) - if err != nil { - t.Fatal(err) - } - store.Close() - - // Access repo - req := &Request{Cwd: filepath.Join(tmpDir, "repo1")} - _, err = server.getStorageForRequest(req) - if err != nil { - t.Fatal(err) - } - - // Wait for TTL to expire - time.Sleep(100 * time.Millisecond) - - // Evict - server.evictStaleStorage() - - // Verify evicted - server.cacheMu.RLock() - cacheSize := len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 0 { - t.Fatalf("expected cache to be empty after eviction, got %d", cacheSize) - } - - // Access again - should cleanly re-open - _, err = server.getStorageForRequest(req) - if err != nil { - t.Fatalf("failed to re-open after eviction: %v", err) - } - - // Verify re-cached - server.cacheMu.RLock() - cacheSize = len(server.storageCache) - server.cacheMu.RUnlock() - if cacheSize != 1 { - t.Errorf("expected 1 cached entry after re-open, got %d", cacheSize) - } -} - -func TestStorageCacheEviction_StopIdempotent(t *testing.T) { - tmpDir := t.TempDir() - - // Create main DB - mainDB := filepath.Join(tmpDir, "main.db") - mainStore, err := sqlite.New(mainDB) - if err != nil { - t.Fatal(err) - } - defer mainStore.Close() - - // Create server - socketPath := filepath.Join(tmpDir, "test.sock") - server := NewServer(socketPath, mainStore, tmpDir, mainDB) - - // Stop multiple times - should not panic - if err := server.Stop(); err != nil { - t.Fatalf("first Stop failed: %v", err) - } - if err := server.Stop(); err != nil { - t.Fatalf("second Stop failed: %v", err) - } - if err := server.Stop(); err != nil { - t.Fatalf("third Stop failed: %v", err) - } -} diff --git a/internal/rpc/server_lifecycle_conn.go b/internal/rpc/server_lifecycle_conn.go index 6412daa6..44b1f5f4 100644 --- a/internal/rpc/server_lifecycle_conn.go +++ b/internal/rpc/server_lifecycle_conn.go @@ -12,8 +12,6 @@ import ( "runtime" "sync/atomic" "time" - - "github.com/steveyegge/beads/internal/storage" ) // Start starts the RPC server and listens for connections diff --git a/internal/rpc/server_routing_validation_diagnostics.go b/internal/rpc/server_routing_validation_diagnostics.go index 13550cfb..4f2cf21f 100644 --- a/internal/rpc/server_routing_validation_diagnostics.go +++ b/internal/rpc/server_routing_validation_diagnostics.go @@ -78,22 +78,8 @@ func (s *Server) validateDatabaseBinding(req *Request) error { return nil } - // For multi-database daemons: If a cwd is provided, verify the client expects - // the database that would be selected for that cwd - var daemonDB string - if req.Cwd != "" { - // Use the database discovery logic to find which DB would be used - dbPath := s.findDatabaseForCwd(req.Cwd) - if dbPath != "" { - daemonDB = dbPath - } else { - // No database found for cwd, will fall back to default storage - daemonDB = s.storage.Path() - } - } else { - // No cwd provided, use default storage - daemonDB = s.storage.Path() - } + // Local daemon always uses single storage + daemonDB := s.storage.Path() // Normalize both paths for comparison (resolve symlinks, clean paths) expectedPath, err := filepath.EvalSymlinks(req.ExpectedDB) @@ -312,10 +298,6 @@ func (s *Server) handleHealth(req *Request) Response { status = "degraded" } - s.cacheMu.RLock() - cacheSize := len(s.storageCache) - s.cacheMu.RUnlock() - // Check version compatibility compatible := true if req.ClientVersion != "" { @@ -330,9 +312,6 @@ func (s *Server) handleHealth(req *Request) Response { ClientVersion: req.ClientVersion, Compatible: compatible, Uptime: time.Since(s.startTime).Seconds(), - CacheSize: cacheSize, - CacheHits: atomic.LoadInt64(&s.cacheHits), - CacheMisses: atomic.LoadInt64(&s.cacheMisses), DBResponseTime: dbResponseMs, ActiveConns: atomic.LoadInt32(&s.activeConns), MaxConns: s.maxConns, @@ -352,14 +331,7 @@ func (s *Server) handleHealth(req *Request) Response { } func (s *Server) handleMetrics(_ *Request) Response { - s.cacheMu.RLock() - cacheSize := len(s.storageCache) - s.cacheMu.RUnlock() - snapshot := s.metrics.Snapshot( - atomic.LoadInt64(&s.cacheHits), - atomic.LoadInt64(&s.cacheMisses), - cacheSize, int(atomic.LoadInt32(&s.activeConns)), )