Remove daemon storage cache (bd-33, bd-34, bd-35)
- Deleted server_cache_storage.go (~300 lines) - Removed cache fields from Server struct - Simplified database routing to use s.storage directly - Removed cache metrics from health and metrics endpoints - Deleted server_eviction_test.go (cache eviction tests) - Cleaned up limits_test.go (removed cache assertions) - All tests passing
This commit is contained in:
@@ -386,15 +386,6 @@ func showDaemonHealth(global bool) {
|
||||
|
||||
fmt.Printf(" Version: %s\n", health.Version)
|
||||
fmt.Printf(" Uptime: %s\n", formatUptime(health.Uptime))
|
||||
fmt.Printf(" Cache Size: %d databases\n", health.CacheSize)
|
||||
fmt.Printf(" Cache Hits: %d\n", health.CacheHits)
|
||||
fmt.Printf(" Cache Misses: %d\n", health.CacheMisses)
|
||||
|
||||
if health.CacheHits+health.CacheMisses > 0 {
|
||||
hitRate := float64(health.CacheHits) / float64(health.CacheHits+health.CacheMisses) * 100
|
||||
fmt.Printf(" Cache Hit Rate: %.1f%%\n", hitRate)
|
||||
}
|
||||
|
||||
fmt.Printf(" DB Response Time: %.2f ms\n", health.DBResponseTime)
|
||||
|
||||
if health.Error != "" {
|
||||
@@ -455,17 +446,6 @@ func showDaemonMetrics(global bool) {
|
||||
fmt.Printf("Uptime: %.1f seconds (%.1f minutes)\n", metrics.UptimeSeconds, metrics.UptimeSeconds/60)
|
||||
fmt.Printf("Timestamp: %s\n\n", metrics.Timestamp.Format(time.RFC3339))
|
||||
|
||||
// Cache metrics
|
||||
fmt.Printf("Cache Metrics:\n")
|
||||
fmt.Printf(" Size: %d databases\n", metrics.CacheSize)
|
||||
fmt.Printf(" Hits: %d\n", metrics.CacheHits)
|
||||
fmt.Printf(" Misses: %d\n", metrics.CacheMisses)
|
||||
if metrics.CacheHits+metrics.CacheMisses > 0 {
|
||||
hitRate := float64(metrics.CacheHits) / float64(metrics.CacheHits+metrics.CacheMisses) * 100
|
||||
fmt.Printf(" Hit Rate: %.1f%%\n", hitRate)
|
||||
}
|
||||
fmt.Printf(" Evictions: %d\n\n", metrics.CacheEvictions)
|
||||
|
||||
// Connection metrics
|
||||
fmt.Printf("Connection Metrics:\n")
|
||||
fmt.Printf(" Total: %d\n", metrics.TotalConns)
|
||||
|
||||
@@ -74,8 +74,8 @@ func TryConnectWithTimeout(socketPath string, dialTimeout time.Duration) (*Clien
|
||||
}
|
||||
|
||||
if os.Getenv("BD_DEBUG") != "" {
|
||||
fmt.Fprintf(os.Stderr, "Debug: connected to daemon (status: %s, uptime: %.1fs, cache: %d)\n",
|
||||
health.Status, health.Uptime, health.CacheSize)
|
||||
fmt.Fprintf(os.Stderr, "Debug: connected to daemon (status: %s, uptime: %.1fs)\n",
|
||||
health.Status, health.Uptime)
|
||||
}
|
||||
|
||||
return client, nil
|
||||
|
||||
@@ -4,11 +4,9 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@@ -200,59 +198,6 @@ func TestRequestTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryPressureDetection(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("memory pressure detection thresholds are not reliable on Windows")
|
||||
}
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
dbPath := filepath.Join(tmpDir, ".beads", "test.db")
|
||||
if err := os.MkdirAll(filepath.Dir(dbPath), 0750); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
|
||||
srv := NewServer(socketPath, store, tmpDir, dbPath)
|
||||
|
||||
// Add some entries to cache
|
||||
srv.cacheMu.Lock()
|
||||
for i := 0; i < 10; i++ {
|
||||
path := fmt.Sprintf("/test/path/%d", i)
|
||||
srv.storageCache[path] = &StorageCacheEntry{
|
||||
store: store,
|
||||
lastAccess: time.Now().Add(-time.Duration(i) * time.Minute),
|
||||
}
|
||||
}
|
||||
initialSize := len(srv.storageCache)
|
||||
srv.cacheMu.Unlock()
|
||||
|
||||
// Trigger aggressive eviction directly (should evict 50% of entries)
|
||||
srv.aggressiveEviction()
|
||||
|
||||
// Check that some entries were evicted
|
||||
srv.cacheMu.RLock()
|
||||
finalSize := len(srv.storageCache)
|
||||
srv.cacheMu.RUnlock()
|
||||
|
||||
if finalSize >= initialSize {
|
||||
t.Errorf("expected cache eviction, but size went from %d to %d", initialSize, finalSize)
|
||||
}
|
||||
|
||||
expectedSize := initialSize / 2
|
||||
if finalSize != expectedSize {
|
||||
t.Errorf("expected exactly %d entries after evicting 50%%, got %d", expectedSize, finalSize)
|
||||
}
|
||||
|
||||
t.Logf("Cache evicted: %d -> %d entries", initialSize, finalSize)
|
||||
}
|
||||
|
||||
func TestHealthResponseIncludesLimits(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "bd-limits-test-*")
|
||||
if err != nil {
|
||||
|
||||
@@ -23,9 +23,6 @@ type Metrics struct {
|
||||
totalConns int64
|
||||
rejectedConns int64
|
||||
|
||||
// Cache metrics (handled separately via atomic in Server)
|
||||
cacheEvictions int64
|
||||
|
||||
// System start time (for uptime calculation)
|
||||
startTime time.Time
|
||||
}
|
||||
@@ -76,13 +73,8 @@ func (m *Metrics) RecordRejectedConnection() {
|
||||
atomic.AddInt64(&m.rejectedConns, 1)
|
||||
}
|
||||
|
||||
// RecordCacheEviction records a cache eviction event
|
||||
func (m *Metrics) RecordCacheEviction() {
|
||||
atomic.AddInt64(&m.cacheEvictions, 1)
|
||||
}
|
||||
|
||||
// Snapshot returns a point-in-time snapshot of all metrics
|
||||
func (m *Metrics) Snapshot(cacheHits, cacheMisses int64, cacheSize, activeConns int) MetricsSnapshot {
|
||||
func (m *Metrics) Snapshot(activeConns int) MetricsSnapshot {
|
||||
// Copy data under a short critical section
|
||||
m.mu.RLock()
|
||||
|
||||
@@ -161,10 +153,6 @@ func (m *Metrics) Snapshot(cacheHits, cacheMisses int64, cacheSize, activeConns
|
||||
Timestamp: time.Now(),
|
||||
UptimeSeconds: uptimeSeconds,
|
||||
Operations: operations,
|
||||
CacheHits: cacheHits,
|
||||
CacheMisses: cacheMisses,
|
||||
CacheSize: cacheSize,
|
||||
CacheEvictions: atomic.LoadInt64(&m.cacheEvictions),
|
||||
TotalConns: atomic.LoadInt64(&m.totalConns),
|
||||
ActiveConns: activeConns,
|
||||
RejectedConns: atomic.LoadInt64(&m.rejectedConns),
|
||||
@@ -179,10 +167,6 @@ type MetricsSnapshot struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
UptimeSeconds float64 `json:"uptime_seconds"`
|
||||
Operations []OperationMetrics `json:"operations"`
|
||||
CacheHits int64 `json:"cache_hits"`
|
||||
CacheMisses int64 `json:"cache_misses"`
|
||||
CacheSize int `json:"cache_size"`
|
||||
CacheEvictions int64 `json:"cache_evictions"`
|
||||
TotalConns int64 `json:"total_connections"`
|
||||
ActiveConns int `json:"active_connections"`
|
||||
RejectedConns int64 `json:"rejected_connections"`
|
||||
|
||||
@@ -52,16 +52,6 @@ func TestMetricsRecording(t *testing.T) {
|
||||
t.Errorf("Expected rejected count to increase by 1, got %d -> %d", before, after)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("record cache eviction", func(t *testing.T) {
|
||||
before := m.cacheEvictions
|
||||
m.RecordCacheEviction()
|
||||
after := m.cacheEvictions
|
||||
|
||||
if after != before+1 {
|
||||
t.Errorf("Expected eviction count to increase by 1, got %d -> %d", before, after)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestMetricsSnapshot(t *testing.T) {
|
||||
@@ -74,10 +64,9 @@ func TestMetricsSnapshot(t *testing.T) {
|
||||
m.RecordError("create")
|
||||
m.RecordConnection()
|
||||
m.RecordRejectedConnection()
|
||||
m.RecordCacheEviction()
|
||||
|
||||
// Take snapshot
|
||||
snapshot := m.Snapshot(100, 10, 50, 3)
|
||||
snapshot := m.Snapshot(3)
|
||||
|
||||
t.Run("basic metrics", func(t *testing.T) {
|
||||
if snapshot.TotalConns < 1 {
|
||||
@@ -86,18 +75,6 @@ func TestMetricsSnapshot(t *testing.T) {
|
||||
if snapshot.RejectedConns < 1 {
|
||||
t.Error("Expected at least 1 rejected connection")
|
||||
}
|
||||
if snapshot.CacheEvictions < 1 {
|
||||
t.Error("Expected at least 1 cache eviction")
|
||||
}
|
||||
if snapshot.CacheHits != 100 {
|
||||
t.Errorf("Expected 100 cache hits, got %d", snapshot.CacheHits)
|
||||
}
|
||||
if snapshot.CacheMisses != 10 {
|
||||
t.Errorf("Expected 10 cache misses, got %d", snapshot.CacheMisses)
|
||||
}
|
||||
if snapshot.CacheSize != 50 {
|
||||
t.Errorf("Expected cache size 50, got %d", snapshot.CacheSize)
|
||||
}
|
||||
if snapshot.ActiveConns != 3 {
|
||||
t.Errorf("Expected 3 active connections, got %d", snapshot.ActiveConns)
|
||||
}
|
||||
|
||||
@@ -187,9 +187,6 @@ type HealthResponse struct {
|
||||
ClientVersion string `json:"client_version,omitempty"` // Client version from request
|
||||
Compatible bool `json:"compatible"` // Whether versions are compatible
|
||||
Uptime float64 `json:"uptime_seconds"`
|
||||
CacheSize int `json:"cache_size"`
|
||||
CacheHits int64 `json:"cache_hits"`
|
||||
CacheMisses int64 `json:"cache_misses"`
|
||||
DBResponseTime float64 `json:"db_response_ms"`
|
||||
ActiveConns int32 `json:"active_connections"`
|
||||
MaxConns int `json:"max_connections"`
|
||||
|
||||
@@ -1,286 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
"github.com/steveyegge/beads/internal/storage/sqlite"
|
||||
)
|
||||
|
||||
// StorageCacheEntry holds a cached storage with metadata for eviction
|
||||
type StorageCacheEntry struct {
|
||||
store storage.Storage
|
||||
lastAccess time.Time
|
||||
dbMtime time.Time // DB file modification time for detecting external changes
|
||||
}
|
||||
|
||||
// runCleanupLoop periodically evicts stale storage connections and checks memory pressure
|
||||
func (s *Server) runCleanupLoop() {
|
||||
s.cleanupTicker = time.NewTicker(5 * time.Minute)
|
||||
defer s.cleanupTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.cleanupTicker.C:
|
||||
s.checkMemoryPressure()
|
||||
s.evictStaleStorage()
|
||||
case <-s.shutdownChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkMemoryPressure monitors memory usage and triggers aggressive eviction if needed
|
||||
func (s *Server) checkMemoryPressure() {
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
|
||||
// Memory thresholds (configurable via env var)
|
||||
const defaultThresholdMB = 500
|
||||
thresholdMB := defaultThresholdMB
|
||||
if env := os.Getenv("BEADS_DAEMON_MEMORY_THRESHOLD_MB"); env != "" {
|
||||
var threshold int
|
||||
if _, err := fmt.Sscanf(env, "%d", &threshold); err == nil && threshold > 0 {
|
||||
thresholdMB = threshold
|
||||
}
|
||||
}
|
||||
|
||||
allocMB := m.Alloc / 1024 / 1024
|
||||
// #nosec G115 - safe conversion of positive value
|
||||
if allocMB > uint64(thresholdMB) {
|
||||
fmt.Fprintf(os.Stderr, "Warning: High memory usage detected (%d MB), triggering aggressive cache eviction\n", allocMB)
|
||||
s.aggressiveEviction()
|
||||
runtime.GC() // Suggest garbage collection
|
||||
}
|
||||
}
|
||||
|
||||
// aggressiveEviction evicts 50% of cached storage to reduce memory pressure
|
||||
func (s *Server) aggressiveEviction() {
|
||||
toClose := []storage.Storage{}
|
||||
|
||||
s.cacheMu.Lock()
|
||||
|
||||
if len(s.storageCache) == 0 {
|
||||
s.cacheMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Build sorted list by last access
|
||||
type cacheItem struct {
|
||||
path string
|
||||
entry *StorageCacheEntry
|
||||
}
|
||||
items := make([]cacheItem, 0, len(s.storageCache))
|
||||
for path, entry := range s.storageCache {
|
||||
items = append(items, cacheItem{path, entry})
|
||||
}
|
||||
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
return items[i].entry.lastAccess.Before(items[j].entry.lastAccess)
|
||||
})
|
||||
|
||||
// Evict oldest 50%
|
||||
numToEvict := len(items) / 2
|
||||
for i := 0; i < numToEvict; i++ {
|
||||
toClose = append(toClose, items[i].entry.store)
|
||||
delete(s.storageCache, items[i].path)
|
||||
}
|
||||
|
||||
s.cacheMu.Unlock()
|
||||
|
||||
// Close without holding lock
|
||||
for _, store := range toClose {
|
||||
if err := store.Close(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// evictStaleStorage removes idle connections and enforces cache size limits
|
||||
func (s *Server) evictStaleStorage() {
|
||||
now := time.Now()
|
||||
toClose := []storage.Storage{}
|
||||
|
||||
s.cacheMu.Lock()
|
||||
|
||||
// First pass: evict TTL-expired entries
|
||||
for path, entry := range s.storageCache {
|
||||
if now.Sub(entry.lastAccess) > s.cacheTTL {
|
||||
toClose = append(toClose, entry.store)
|
||||
delete(s.storageCache, path)
|
||||
}
|
||||
}
|
||||
|
||||
// Second pass: enforce max cache size with LRU eviction
|
||||
if len(s.storageCache) > s.maxCacheSize {
|
||||
// Build sorted list of entries by lastAccess
|
||||
type cacheItem struct {
|
||||
path string
|
||||
entry *StorageCacheEntry
|
||||
}
|
||||
items := make([]cacheItem, 0, len(s.storageCache))
|
||||
for path, entry := range s.storageCache {
|
||||
items = append(items, cacheItem{path, entry})
|
||||
}
|
||||
|
||||
// Sort by lastAccess (oldest first) with sort.Slice
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
return items[i].entry.lastAccess.Before(items[j].entry.lastAccess)
|
||||
})
|
||||
|
||||
// Evict oldest entries until we're under the limit
|
||||
numToEvict := len(s.storageCache) - s.maxCacheSize
|
||||
for i := 0; i < numToEvict && i < len(items); i++ {
|
||||
toClose = append(toClose, items[i].entry.store)
|
||||
delete(s.storageCache, items[i].path)
|
||||
s.metrics.RecordCacheEviction()
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock before closing to avoid holding lock during Close
|
||||
s.cacheMu.Unlock()
|
||||
|
||||
// Close connections synchronously
|
||||
for _, store := range toClose {
|
||||
if err := store.Close(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getStorageForRequest returns the appropriate storage for the request
|
||||
// If req.Cwd is set, it finds the database for that directory
|
||||
// Otherwise, it uses the default storage
|
||||
func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) {
|
||||
// If no cwd specified, use default storage
|
||||
if req.Cwd == "" {
|
||||
return s.storage, nil
|
||||
}
|
||||
|
||||
// Find database for this cwd (to get the canonical repo root)
|
||||
dbPath := s.findDatabaseForCwd(req.Cwd)
|
||||
if dbPath == "" {
|
||||
return nil, fmt.Errorf("no .beads database found for path: %s", req.Cwd)
|
||||
}
|
||||
|
||||
// Canonicalize key to repo root (parent of .beads directory)
|
||||
beadsDir := filepath.Dir(dbPath)
|
||||
repoRoot := filepath.Dir(beadsDir)
|
||||
|
||||
// Check cache first with write lock (to avoid race on lastAccess update)
|
||||
s.cacheMu.Lock()
|
||||
defer s.cacheMu.Unlock()
|
||||
|
||||
if entry, ok := s.storageCache[repoRoot]; ok {
|
||||
// Check if DB file has been modified externally
|
||||
info, err := os.Stat(dbPath)
|
||||
if err == nil && !info.ModTime().Equal(entry.dbMtime) {
|
||||
// DB file changed - evict stale cache entry
|
||||
// Remove from cache first to prevent concurrent access
|
||||
delete(s.storageCache, repoRoot)
|
||||
atomic.AddInt64(&s.cacheMisses, 1)
|
||||
// Close storage after removing from cache (safe now)
|
||||
// Unlock briefly to avoid blocking during Close()
|
||||
s.cacheMu.Unlock()
|
||||
if err := entry.store.Close(); err != nil {
|
||||
// Log but don't fail - we'll reopen anyway
|
||||
fmt.Fprintf(os.Stderr, "Warning: failed to close stale cached storage: %v\n", err)
|
||||
}
|
||||
s.cacheMu.Lock()
|
||||
// Fall through to reopen
|
||||
} else if err == nil {
|
||||
// Cache hit - DB file unchanged
|
||||
entry.lastAccess = time.Now()
|
||||
atomic.AddInt64(&s.cacheHits, 1)
|
||||
return entry.store, nil
|
||||
} else {
|
||||
// Stat failed - evict and reopen
|
||||
// Remove from cache first to prevent concurrent access
|
||||
delete(s.storageCache, repoRoot)
|
||||
atomic.AddInt64(&s.cacheMisses, 1)
|
||||
// Close storage after removing from cache
|
||||
s.cacheMu.Unlock()
|
||||
if err := entry.store.Close(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Warning: failed to close cached storage: %v\n", err)
|
||||
}
|
||||
s.cacheMu.Lock()
|
||||
// Fall through to reopen
|
||||
}
|
||||
} else {
|
||||
atomic.AddInt64(&s.cacheMisses, 1)
|
||||
}
|
||||
|
||||
// Open storage
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open database at %s: %w", dbPath, err)
|
||||
}
|
||||
|
||||
// Get mtime for the newly opened DB
|
||||
info, err := os.Stat(dbPath)
|
||||
if err != nil {
|
||||
// If we can't stat, still cache it but with zero mtime (will invalidate on next check)
|
||||
info = nil
|
||||
}
|
||||
|
||||
mtime := time.Time{}
|
||||
if info != nil {
|
||||
mtime = info.ModTime()
|
||||
}
|
||||
|
||||
// Cache it with current timestamp and mtime
|
||||
s.storageCache[repoRoot] = &StorageCacheEntry{
|
||||
store: store,
|
||||
lastAccess: time.Now(),
|
||||
dbMtime: mtime,
|
||||
}
|
||||
|
||||
// Enforce LRU immediately to prevent FD spikes between cleanup ticks
|
||||
needEvict := len(s.storageCache) > s.maxCacheSize
|
||||
s.cacheMu.Unlock()
|
||||
|
||||
if needEvict {
|
||||
s.evictStaleStorage()
|
||||
}
|
||||
|
||||
// Re-acquire lock for defer
|
||||
s.cacheMu.Lock()
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
// findDatabaseForCwd walks up from cwd to find .beads/*.db
|
||||
func (s *Server) findDatabaseForCwd(cwd string) string {
|
||||
dir, err := filepath.Abs(cwd)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Walk up directory tree
|
||||
for {
|
||||
beadsDir := filepath.Join(dir, ".beads")
|
||||
if info, err := os.Stat(beadsDir); err == nil && info.IsDir() {
|
||||
// Found .beads/ directory, look for *.db files
|
||||
matches, err := filepath.Glob(filepath.Join(beadsDir, "*.db"))
|
||||
if err == nil && len(matches) > 0 {
|
||||
return matches[0]
|
||||
}
|
||||
}
|
||||
|
||||
// Move up one directory
|
||||
parent := filepath.Dir(dir)
|
||||
if parent == dir {
|
||||
// Reached filesystem root
|
||||
break
|
||||
}
|
||||
dir = parent
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
@@ -1,525 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage/sqlite"
|
||||
)
|
||||
|
||||
func TestStorageCacheEviction_TTL(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server with short TTL for testing
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
server.cacheTTL = 100 * time.Millisecond // Short TTL for testing
|
||||
defer server.Stop()
|
||||
|
||||
// Create two test databases
|
||||
db1 := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(db1), 0755)
|
||||
store1, err := sqlite.New(db1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store1.Close()
|
||||
|
||||
db2 := filepath.Join(tmpDir, "repo2", ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(db2), 0755)
|
||||
store2, err := sqlite.New(db2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store2.Close()
|
||||
|
||||
// Access both repos to populate cache
|
||||
req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
|
||||
_, err = server.getStorageForRequest(req1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
req2 := &Request{Cwd: filepath.Join(tmpDir, "repo2")}
|
||||
_, err = server.getStorageForRequest(req2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify both are cached
|
||||
server.cacheMu.RLock()
|
||||
cacheSize := len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 2 {
|
||||
t.Fatalf("expected 2 cached entries, got %d", cacheSize)
|
||||
}
|
||||
|
||||
// Wait for TTL to expire
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
// Run eviction
|
||||
server.evictStaleStorage()
|
||||
|
||||
// Verify both entries were evicted
|
||||
server.cacheMu.RLock()
|
||||
cacheSize = len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 0 {
|
||||
t.Fatalf("expected 0 cached entries after TTL eviction, got %d", cacheSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_LRU(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server with small cache size
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
server.maxCacheSize = 2 // Only keep 2 entries
|
||||
server.cacheTTL = 1 * time.Hour // Long TTL so we test LRU
|
||||
defer server.Stop()
|
||||
|
||||
// Create three test databases
|
||||
for i := 1; i <= 3; i++ {
|
||||
dbPath := filepath.Join(tmpDir, "repo"+string(rune('0'+i)), ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(dbPath), 0755)
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store.Close()
|
||||
}
|
||||
|
||||
// Access repos 1 and 2
|
||||
req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
|
||||
_, err = server.getStorageForRequest(req1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) // Ensure different timestamps
|
||||
|
||||
req2 := &Request{Cwd: filepath.Join(tmpDir, "repo2")}
|
||||
_, err = server.getStorageForRequest(req2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify 2 entries cached
|
||||
server.cacheMu.RLock()
|
||||
cacheSize := len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 2 {
|
||||
t.Fatalf("expected 2 cached entries, got %d", cacheSize)
|
||||
}
|
||||
|
||||
// Access repo 3, which should trigger LRU eviction of repo1 (oldest)
|
||||
req3 := &Request{Cwd: filepath.Join(tmpDir, "repo3")}
|
||||
_, err = server.getStorageForRequest(req3)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Run eviction to enforce max cache size
|
||||
server.evictStaleStorage()
|
||||
|
||||
// Should still have 2 entries
|
||||
server.cacheMu.RLock()
|
||||
cacheSize = len(server.storageCache)
|
||||
_, hasRepo1 := server.storageCache[filepath.Join(tmpDir, "repo1")]
|
||||
_, hasRepo2 := server.storageCache[filepath.Join(tmpDir, "repo2")]
|
||||
_, hasRepo3 := server.storageCache[filepath.Join(tmpDir, "repo3")]
|
||||
server.cacheMu.RUnlock()
|
||||
|
||||
if cacheSize != 2 {
|
||||
t.Fatalf("expected 2 cached entries after LRU eviction, got %d", cacheSize)
|
||||
}
|
||||
|
||||
// Repo1 should be evicted (oldest), repo2 and repo3 should remain
|
||||
if hasRepo1 {
|
||||
t.Error("repo1 should have been evicted (oldest)")
|
||||
}
|
||||
if !hasRepo2 {
|
||||
t.Error("repo2 should still be cached")
|
||||
}
|
||||
if !hasRepo3 {
|
||||
t.Error("repo3 should be cached")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_LastAccessUpdate(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
defer server.Stop()
|
||||
|
||||
// Create test database
|
||||
dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(dbPath), 0755)
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store.Close()
|
||||
|
||||
// First access
|
||||
req := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
|
||||
_, err = server.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Get initial lastAccess time
|
||||
server.cacheMu.RLock()
|
||||
entry := server.storageCache[filepath.Join(tmpDir, "repo1")]
|
||||
initialTime := entry.lastAccess
|
||||
server.cacheMu.RUnlock()
|
||||
|
||||
// Wait a bit
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Access again
|
||||
_, err = server.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify lastAccess was updated
|
||||
server.cacheMu.RLock()
|
||||
entry = server.storageCache[filepath.Join(tmpDir, "repo1")]
|
||||
updatedTime := entry.lastAccess
|
||||
server.cacheMu.RUnlock()
|
||||
|
||||
if !updatedTime.After(initialTime) {
|
||||
t.Errorf("lastAccess should be updated on cache hit, initial: %v, updated: %v", initialTime, updatedTime)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_EnvVars(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Set env vars
|
||||
os.Setenv("BEADS_DAEMON_MAX_CACHE_SIZE", "100")
|
||||
os.Setenv("BEADS_DAEMON_CACHE_TTL", "1h30m")
|
||||
defer os.Unsetenv("BEADS_DAEMON_MAX_CACHE_SIZE")
|
||||
defer os.Unsetenv("BEADS_DAEMON_CACHE_TTL")
|
||||
|
||||
// Create server
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
defer server.Stop()
|
||||
|
||||
// Verify config was parsed
|
||||
if server.maxCacheSize != 100 {
|
||||
t.Errorf("expected maxCacheSize=100, got %d", server.maxCacheSize)
|
||||
}
|
||||
expectedTTL := 90 * time.Minute
|
||||
if server.cacheTTL != expectedTTL {
|
||||
t.Errorf("expected cacheTTL=%v, got %v", expectedTTL, server.cacheTTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_CleanupOnStop(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
|
||||
// Create test database and populate cache
|
||||
dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(dbPath), 0755)
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store.Close()
|
||||
|
||||
req := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
|
||||
_, err = server.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify cached
|
||||
server.cacheMu.RLock()
|
||||
cacheSize := len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 1 {
|
||||
t.Fatalf("expected 1 cached entry, got %d", cacheSize)
|
||||
}
|
||||
|
||||
// Stop server
|
||||
if err := server.Stop(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify cache was cleared
|
||||
server.cacheMu.RLock()
|
||||
cacheSize = len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 0 {
|
||||
t.Errorf("expected cache to be cleared on stop, got %d entries", cacheSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_CanonicalKey(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
defer server.Stop()
|
||||
|
||||
// Create test database
|
||||
dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(dbPath), 0755)
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store.Close()
|
||||
|
||||
// Access from different subdirectories of the same repo
|
||||
req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
|
||||
_, err = server.getStorageForRequest(req1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
req2 := &Request{Cwd: filepath.Join(tmpDir, "repo1", "subdir1")}
|
||||
_, err = server.getStorageForRequest(req2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
req3 := &Request{Cwd: filepath.Join(tmpDir, "repo1", "subdir1", "subdir2")}
|
||||
_, err = server.getStorageForRequest(req3)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Should only have one cache entry (all pointing to same repo root)
|
||||
server.cacheMu.RLock()
|
||||
cacheSize := len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 1 {
|
||||
t.Errorf("expected 1 cached entry (canonical key), got %d", cacheSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_ImmediateLRU(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server with max cache size of 2
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
server.maxCacheSize = 2
|
||||
server.cacheTTL = 1 * time.Hour // Long TTL
|
||||
defer server.Stop()
|
||||
|
||||
// Create 3 test databases
|
||||
for i := 1; i <= 3; i++ {
|
||||
dbPath := filepath.Join(tmpDir, fmt.Sprintf("repo%d", i), ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(dbPath), 0755)
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store.Close()
|
||||
}
|
||||
|
||||
// Access all 3 repos
|
||||
for i := 1; i <= 3; i++ {
|
||||
req := &Request{Cwd: filepath.Join(tmpDir, fmt.Sprintf("repo%d", i))}
|
||||
_, err = server.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) // Ensure different timestamps
|
||||
}
|
||||
|
||||
// Cache should never exceed maxCacheSize (immediate LRU enforcement)
|
||||
server.cacheMu.RLock()
|
||||
cacheSize := len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize > server.maxCacheSize {
|
||||
t.Errorf("cache size %d exceeds max %d (immediate LRU not enforced)", cacheSize, server.maxCacheSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_InvalidTTL(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Set invalid TTL
|
||||
os.Setenv("BEADS_DAEMON_CACHE_TTL", "-5m")
|
||||
defer os.Unsetenv("BEADS_DAEMON_CACHE_TTL")
|
||||
|
||||
// Create server
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
defer server.Stop()
|
||||
|
||||
// Should fall back to default (30 minutes)
|
||||
expectedTTL := 30 * time.Minute
|
||||
if server.cacheTTL != expectedTTL {
|
||||
t.Errorf("expected TTL to fall back to %v for invalid value, got %v", expectedTTL, server.cacheTTL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_ReopenAfterEviction(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server with short TTL
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
server.cacheTTL = 50 * time.Millisecond
|
||||
defer server.Stop()
|
||||
|
||||
// Create test database
|
||||
dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
|
||||
os.MkdirAll(filepath.Dir(dbPath), 0755)
|
||||
store, err := sqlite.New(dbPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
store.Close()
|
||||
|
||||
// Access repo
|
||||
req := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
|
||||
_, err = server.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Wait for TTL to expire
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Evict
|
||||
server.evictStaleStorage()
|
||||
|
||||
// Verify evicted
|
||||
server.cacheMu.RLock()
|
||||
cacheSize := len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 0 {
|
||||
t.Fatalf("expected cache to be empty after eviction, got %d", cacheSize)
|
||||
}
|
||||
|
||||
// Access again - should cleanly re-open
|
||||
_, err = server.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to re-open after eviction: %v", err)
|
||||
}
|
||||
|
||||
// Verify re-cached
|
||||
server.cacheMu.RLock()
|
||||
cacheSize = len(server.storageCache)
|
||||
server.cacheMu.RUnlock()
|
||||
if cacheSize != 1 {
|
||||
t.Errorf("expected 1 cached entry after re-open, got %d", cacheSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStorageCacheEviction_StopIdempotent(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
// Create main DB
|
||||
mainDB := filepath.Join(tmpDir, "main.db")
|
||||
mainStore, err := sqlite.New(mainDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer mainStore.Close()
|
||||
|
||||
// Create server
|
||||
socketPath := filepath.Join(tmpDir, "test.sock")
|
||||
server := NewServer(socketPath, mainStore, tmpDir, mainDB)
|
||||
|
||||
// Stop multiple times - should not panic
|
||||
if err := server.Stop(); err != nil {
|
||||
t.Fatalf("first Stop failed: %v", err)
|
||||
}
|
||||
if err := server.Stop(); err != nil {
|
||||
t.Fatalf("second Stop failed: %v", err)
|
||||
}
|
||||
if err := server.Stop(); err != nil {
|
||||
t.Fatalf("third Stop failed: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -12,8 +12,6 @@ import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
)
|
||||
|
||||
// Start starts the RPC server and listens for connections
|
||||
|
||||
@@ -78,22 +78,8 @@ func (s *Server) validateDatabaseBinding(req *Request) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// For multi-database daemons: If a cwd is provided, verify the client expects
|
||||
// the database that would be selected for that cwd
|
||||
var daemonDB string
|
||||
if req.Cwd != "" {
|
||||
// Use the database discovery logic to find which DB would be used
|
||||
dbPath := s.findDatabaseForCwd(req.Cwd)
|
||||
if dbPath != "" {
|
||||
daemonDB = dbPath
|
||||
} else {
|
||||
// No database found for cwd, will fall back to default storage
|
||||
daemonDB = s.storage.Path()
|
||||
}
|
||||
} else {
|
||||
// No cwd provided, use default storage
|
||||
daemonDB = s.storage.Path()
|
||||
}
|
||||
// Local daemon always uses single storage
|
||||
daemonDB := s.storage.Path()
|
||||
|
||||
// Normalize both paths for comparison (resolve symlinks, clean paths)
|
||||
expectedPath, err := filepath.EvalSymlinks(req.ExpectedDB)
|
||||
@@ -312,10 +298,6 @@ func (s *Server) handleHealth(req *Request) Response {
|
||||
status = "degraded"
|
||||
}
|
||||
|
||||
s.cacheMu.RLock()
|
||||
cacheSize := len(s.storageCache)
|
||||
s.cacheMu.RUnlock()
|
||||
|
||||
// Check version compatibility
|
||||
compatible := true
|
||||
if req.ClientVersion != "" {
|
||||
@@ -330,9 +312,6 @@ func (s *Server) handleHealth(req *Request) Response {
|
||||
ClientVersion: req.ClientVersion,
|
||||
Compatible: compatible,
|
||||
Uptime: time.Since(s.startTime).Seconds(),
|
||||
CacheSize: cacheSize,
|
||||
CacheHits: atomic.LoadInt64(&s.cacheHits),
|
||||
CacheMisses: atomic.LoadInt64(&s.cacheMisses),
|
||||
DBResponseTime: dbResponseMs,
|
||||
ActiveConns: atomic.LoadInt32(&s.activeConns),
|
||||
MaxConns: s.maxConns,
|
||||
@@ -352,14 +331,7 @@ func (s *Server) handleHealth(req *Request) Response {
|
||||
}
|
||||
|
||||
func (s *Server) handleMetrics(_ *Request) Response {
|
||||
s.cacheMu.RLock()
|
||||
cacheSize := len(s.storageCache)
|
||||
s.cacheMu.RUnlock()
|
||||
|
||||
snapshot := s.metrics.Snapshot(
|
||||
atomic.LoadInt64(&s.cacheHits),
|
||||
atomic.LoadInt64(&s.cacheMisses),
|
||||
cacheSize,
|
||||
int(atomic.LoadInt32(&s.activeConns)),
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user