Add storage cache eviction policy to daemon (bd-145)

Implemented TTL-based and LRU cache eviction for daemon storage connections:

- Add StorageCacheEntry with lastAccess timestamp tracking
- Cleanup goroutine runs every 5 minutes to evict stale entries
- TTL-based eviction: remove entries idle >30min (configurable)
- LRU eviction: enforce max cache size (default: 50 repos)
- Configurable via BEADS_DAEMON_MAX_CACHE_SIZE and BEADS_DAEMON_CACHE_TTL
- Proper cleanup on server shutdown
- Update lastAccess on cache hits
- Comprehensive tests for eviction logic

Fixes memory leaks and file descriptor exhaustion for multi-repo users.

Amp-Thread-ID: https://ampcode.com/threads/T-1148d8b3-b8a8-45fc-af9c-b5be14c4834d
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-10-18 13:17:07 -07:00
parent 491cb82489
commit 259e994522
3 changed files with 460 additions and 32 deletions

View File

@@ -18,24 +18,54 @@ import (
"github.com/steveyegge/beads/internal/types"
)
// StorageCacheEntry holds a cached storage with metadata for eviction
type StorageCacheEntry struct {
store storage.Storage
lastAccess time.Time
}
// Server represents the RPC server that runs in the daemon
type Server struct {
socketPath string
storage storage.Storage // Default storage (for backward compat)
listener net.Listener
mu sync.RWMutex
shutdown bool
// Per-request storage routing
storageCache map[string]storage.Storage // path -> storage
cacheMu sync.RWMutex
socketPath string
storage storage.Storage // Default storage (for backward compat)
listener net.Listener
mu sync.RWMutex
shutdown bool
shutdownChan chan struct{}
// Per-request storage routing with eviction support
storageCache map[string]*StorageCacheEntry // path -> entry
cacheMu sync.RWMutex
maxCacheSize int
cacheTTL time.Duration
cleanupTicker *time.Ticker
}
// NewServer creates a new RPC server
func NewServer(socketPath string, store storage.Storage) *Server {
// Parse config from env vars
maxCacheSize := 50 // default
if env := os.Getenv("BEADS_DAEMON_MAX_CACHE_SIZE"); env != "" {
// Parse as integer
var size int
if _, err := fmt.Sscanf(env, "%d", &size); err == nil && size > 0 {
maxCacheSize = size
}
}
cacheTTL := 30 * time.Minute // default
if env := os.Getenv("BEADS_DAEMON_CACHE_TTL"); env != "" {
if ttl, err := time.ParseDuration(env); err == nil {
cacheTTL = ttl
}
}
return &Server{
socketPath: socketPath,
storage: store,
storageCache: make(map[string]storage.Storage),
storageCache: make(map[string]*StorageCacheEntry),
maxCacheSize: maxCacheSize,
cacheTTL: cacheTTL,
shutdownChan: make(chan struct{}),
}
}
@@ -62,6 +92,7 @@ func (s *Server) Start(ctx context.Context) error {
}
go s.handleSignals()
go s.runCleanupLoop()
for {
conn, err := s.listener.Accept()
@@ -85,6 +116,23 @@ func (s *Server) Stop() error {
s.shutdown = true
s.mu.Unlock()
// Signal cleanup goroutine to stop
close(s.shutdownChan)
if s.cleanupTicker != nil {
s.cleanupTicker.Stop()
}
// Close all cached storage connections
s.cacheMu.Lock()
for _, entry := range s.storageCache {
if err := entry.store.Close(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to close storage: %v\n", err)
}
}
s.storageCache = make(map[string]*StorageCacheEntry)
s.cacheMu.Unlock()
if s.listener != nil {
if err := s.listener.Close(); err != nil {
return fmt.Errorf("failed to close listener: %w", err)
@@ -122,6 +170,76 @@ func (s *Server) handleSignals() {
s.Stop()
}
// runCleanupLoop periodically evicts stale storage connections
func (s *Server) runCleanupLoop() {
s.cleanupTicker = time.NewTicker(5 * time.Minute)
defer s.cleanupTicker.Stop()
for {
select {
case <-s.cleanupTicker.C:
s.evictStaleStorage()
case <-s.shutdownChan:
return
}
}
}
// evictStaleStorage removes idle connections and enforces cache size limits
func (s *Server) evictStaleStorage() {
now := time.Now()
toClose := []storage.Storage{}
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
// First pass: evict TTL-expired entries
for path, entry := range s.storageCache {
if now.Sub(entry.lastAccess) > s.cacheTTL {
toClose = append(toClose, entry.store)
delete(s.storageCache, path)
}
}
// Second pass: enforce max cache size with LRU eviction
if len(s.storageCache) > s.maxCacheSize {
// Build sorted list of entries by lastAccess
type cacheItem struct {
path string
entry *StorageCacheEntry
}
items := make([]cacheItem, 0, len(s.storageCache))
for path, entry := range s.storageCache {
items = append(items, cacheItem{path, entry})
}
// Sort by lastAccess (oldest first)
for i := 0; i < len(items)-1; i++ {
for j := i + 1; j < len(items); j++ {
if items[i].entry.lastAccess.After(items[j].entry.lastAccess) {
items[i], items[j] = items[j], items[i]
}
}
}
// Evict oldest entries until we're under the limit
numToEvict := len(s.storageCache) - s.maxCacheSize
for i := 0; i < numToEvict && i < len(items); i++ {
toClose = append(toClose, items[i].entry.store)
delete(s.storageCache, items[i].path)
}
}
// Close connections outside of lock to avoid blocking
go func() {
for _, store := range toClose {
if err := store.Close(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to close evicted storage: %v\n", err)
}
}
}()
}
func (s *Server) handleConnection(conn net.Conn) {
defer conn.Close()
@@ -712,11 +830,13 @@ func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) {
}
// Check cache first
s.cacheMu.RLock()
cached, ok := s.storageCache[req.Cwd]
s.cacheMu.RUnlock()
if ok {
return cached, nil
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
if entry, ok := s.storageCache[req.Cwd]; ok {
// Update last access time
entry.lastAccess = time.Now()
return entry.store, nil
}
// Find database for this cwd
@@ -731,10 +851,11 @@ func (s *Server) getStorageForRequest(req *Request) (storage.Storage, error) {
return nil, fmt.Errorf("failed to open database at %s: %w", dbPath, err)
}
// Cache it
s.cacheMu.Lock()
s.storageCache[req.Cwd] = store
s.cacheMu.Unlock()
// Cache it with current timestamp
s.storageCache[req.Cwd] = &StorageCacheEntry{
store: store,
lastAccess: time.Now(),
}
return store, nil
}
@@ -784,9 +905,9 @@ func (s *Server) handleReposList(_ *Request) Response {
defer s.cacheMu.RUnlock()
repos := make([]RepoInfo, 0, len(s.storageCache))
for path, store := range s.storageCache {
for path, entry := range s.storageCache {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
stats, err := store.GetStatistics(ctx)
stats, err := entry.store.GetStatistics(ctx)
cancel()
if err != nil {
continue
@@ -795,7 +916,7 @@ func (s *Server) handleReposList(_ *Request) Response {
// Extract prefix from a sample issue
filter := types.IssueFilter{Limit: 1}
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
issues, err := store.SearchIssues(ctx2, "", filter)
issues, err := entry.store.SearchIssues(ctx2, "", filter)
cancel2()
prefix := ""
if err == nil && len(issues) > 0 && len(issues[0].ID) > 0 {
@@ -813,7 +934,7 @@ func (s *Server) handleReposList(_ *Request) Response {
Path: path,
Prefix: prefix,
IssueCount: stats.TotalIssues,
LastAccess: "active",
LastAccess: entry.lastAccess.Format(time.RFC3339),
})
}
@@ -839,7 +960,7 @@ func (s *Server) handleReposReady(req *Request) Response {
if args.GroupByRepo {
result := make([]RepoReadyWork, 0, len(s.storageCache))
for path, store := range s.storageCache {
for path, entry := range s.storageCache {
filter := types.WorkFilter{
Status: types.StatusOpen,
Limit: args.Limit,
@@ -852,7 +973,7 @@ func (s *Server) handleReposReady(req *Request) Response {
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
issues, err := store.GetReadyWork(ctx, filter)
issues, err := entry.store.GetReadyWork(ctx, filter)
cancel()
if err != nil || len(issues) == 0 {
continue
@@ -873,7 +994,7 @@ func (s *Server) handleReposReady(req *Request) Response {
// Flat list of all ready issues across all repos
allIssues := make([]ReposReadyIssue, 0)
for path, store := range s.storageCache {
for path, entry := range s.storageCache {
filter := types.WorkFilter{
Status: types.StatusOpen,
Limit: args.Limit,
@@ -886,7 +1007,7 @@ func (s *Server) handleReposReady(req *Request) Response {
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
issues, err := store.GetReadyWork(ctx, filter)
issues, err := entry.store.GetReadyWork(ctx, filter)
cancel()
if err != nil {
continue
@@ -916,9 +1037,9 @@ func (s *Server) handleReposStats(_ *Request) Response {
perRepo := make(map[string]types.Statistics)
errors := make(map[string]string)
for path, store := range s.storageCache {
for path, entry := range s.storageCache {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
stats, err := store.GetStatistics(ctx)
stats, err := entry.store.GetStatistics(ctx)
cancel()
if err != nil {
errors[path] = err.Error()
@@ -957,10 +1078,10 @@ func (s *Server) handleReposClearCache(_ *Request) Response {
// to avoid holding lock during potentially slow Close() operations
s.cacheMu.Lock()
stores := make([]storage.Storage, 0, len(s.storageCache))
for _, store := range s.storageCache {
stores = append(stores, store)
for _, entry := range s.storageCache {
stores = append(stores, entry.store)
}
s.storageCache = make(map[string]storage.Storage)
s.storageCache = make(map[string]*StorageCacheEntry)
s.cacheMu.Unlock()
// Close all storage connections without holding lock

View File

@@ -0,0 +1,307 @@
package rpc
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/steveyegge/beads/internal/storage/sqlite"
)
func TestStorageCacheEviction_TTL(t *testing.T) {
tmpDir := t.TempDir()
// Create main DB
mainDB := filepath.Join(tmpDir, "main.db")
mainStore, err := sqlite.New(mainDB)
if err != nil {
t.Fatal(err)
}
defer mainStore.Close()
// Create server with short TTL for testing
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server.cacheTTL = 100 * time.Millisecond // Short TTL for testing
defer server.Stop()
// Create two test databases
db1 := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
os.MkdirAll(filepath.Dir(db1), 0755)
store1, err := sqlite.New(db1)
if err != nil {
t.Fatal(err)
}
store1.Close()
db2 := filepath.Join(tmpDir, "repo2", ".beads", "issues.db")
os.MkdirAll(filepath.Dir(db2), 0755)
store2, err := sqlite.New(db2)
if err != nil {
t.Fatal(err)
}
store2.Close()
// Access both repos to populate cache
req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
_, err = server.getStorageForRequest(req1)
if err != nil {
t.Fatal(err)
}
req2 := &Request{Cwd: filepath.Join(tmpDir, "repo2")}
_, err = server.getStorageForRequest(req2)
if err != nil {
t.Fatal(err)
}
// Verify both are cached
server.cacheMu.RLock()
cacheSize := len(server.storageCache)
server.cacheMu.RUnlock()
if cacheSize != 2 {
t.Fatalf("expected 2 cached entries, got %d", cacheSize)
}
// Wait for TTL to expire
time.Sleep(150 * time.Millisecond)
// Run eviction
server.evictStaleStorage()
// Verify both entries were evicted
server.cacheMu.RLock()
cacheSize = len(server.storageCache)
server.cacheMu.RUnlock()
if cacheSize != 0 {
t.Fatalf("expected 0 cached entries after TTL eviction, got %d", cacheSize)
}
}
func TestStorageCacheEviction_LRU(t *testing.T) {
tmpDir := t.TempDir()
// Create main DB
mainDB := filepath.Join(tmpDir, "main.db")
mainStore, err := sqlite.New(mainDB)
if err != nil {
t.Fatal(err)
}
defer mainStore.Close()
// Create server with small cache size
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
server.maxCacheSize = 2 // Only keep 2 entries
server.cacheTTL = 1 * time.Hour // Long TTL so we test LRU
defer server.Stop()
// Create three test databases
for i := 1; i <= 3; i++ {
dbPath := filepath.Join(tmpDir, "repo"+string(rune('0'+i)), ".beads", "issues.db")
os.MkdirAll(filepath.Dir(dbPath), 0755)
store, err := sqlite.New(dbPath)
if err != nil {
t.Fatal(err)
}
store.Close()
}
// Access repos 1 and 2
req1 := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
_, err = server.getStorageForRequest(req1)
if err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond) // Ensure different timestamps
req2 := &Request{Cwd: filepath.Join(tmpDir, "repo2")}
_, err = server.getStorageForRequest(req2)
if err != nil {
t.Fatal(err)
}
// Verify 2 entries cached
server.cacheMu.RLock()
cacheSize := len(server.storageCache)
server.cacheMu.RUnlock()
if cacheSize != 2 {
t.Fatalf("expected 2 cached entries, got %d", cacheSize)
}
// Access repo 3, which should trigger LRU eviction of repo1 (oldest)
req3 := &Request{Cwd: filepath.Join(tmpDir, "repo3")}
_, err = server.getStorageForRequest(req3)
if err != nil {
t.Fatal(err)
}
// Run eviction to enforce max cache size
server.evictStaleStorage()
// Should still have 2 entries
server.cacheMu.RLock()
cacheSize = len(server.storageCache)
_, hasRepo1 := server.storageCache[filepath.Join(tmpDir, "repo1")]
_, hasRepo2 := server.storageCache[filepath.Join(tmpDir, "repo2")]
_, hasRepo3 := server.storageCache[filepath.Join(tmpDir, "repo3")]
server.cacheMu.RUnlock()
if cacheSize != 2 {
t.Fatalf("expected 2 cached entries after LRU eviction, got %d", cacheSize)
}
// Repo1 should be evicted (oldest), repo2 and repo3 should remain
if hasRepo1 {
t.Error("repo1 should have been evicted (oldest)")
}
if !hasRepo2 {
t.Error("repo2 should still be cached")
}
if !hasRepo3 {
t.Error("repo3 should be cached")
}
}
func TestStorageCacheEviction_LastAccessUpdate(t *testing.T) {
tmpDir := t.TempDir()
// Create main DB
mainDB := filepath.Join(tmpDir, "main.db")
mainStore, err := sqlite.New(mainDB)
if err != nil {
t.Fatal(err)
}
defer mainStore.Close()
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
defer server.Stop()
// Create test database
dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
os.MkdirAll(filepath.Dir(dbPath), 0755)
store, err := sqlite.New(dbPath)
if err != nil {
t.Fatal(err)
}
store.Close()
// First access
req := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
_, err = server.getStorageForRequest(req)
if err != nil {
t.Fatal(err)
}
// Get initial lastAccess time
server.cacheMu.RLock()
entry := server.storageCache[filepath.Join(tmpDir, "repo1")]
initialTime := entry.lastAccess
server.cacheMu.RUnlock()
// Wait a bit
time.Sleep(50 * time.Millisecond)
// Access again
_, err = server.getStorageForRequest(req)
if err != nil {
t.Fatal(err)
}
// Verify lastAccess was updated
server.cacheMu.RLock()
entry = server.storageCache[filepath.Join(tmpDir, "repo1")]
updatedTime := entry.lastAccess
server.cacheMu.RUnlock()
if !updatedTime.After(initialTime) {
t.Errorf("lastAccess should be updated on cache hit, initial: %v, updated: %v", initialTime, updatedTime)
}
}
func TestStorageCacheEviction_EnvVars(t *testing.T) {
tmpDir := t.TempDir()
// Create main DB
mainDB := filepath.Join(tmpDir, "main.db")
mainStore, err := sqlite.New(mainDB)
if err != nil {
t.Fatal(err)
}
defer mainStore.Close()
// Set env vars
os.Setenv("BEADS_DAEMON_MAX_CACHE_SIZE", "100")
os.Setenv("BEADS_DAEMON_CACHE_TTL", "1h30m")
defer os.Unsetenv("BEADS_DAEMON_MAX_CACHE_SIZE")
defer os.Unsetenv("BEADS_DAEMON_CACHE_TTL")
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
defer server.Stop()
// Verify config was parsed
if server.maxCacheSize != 100 {
t.Errorf("expected maxCacheSize=100, got %d", server.maxCacheSize)
}
expectedTTL := 90 * time.Minute
if server.cacheTTL != expectedTTL {
t.Errorf("expected cacheTTL=%v, got %v", expectedTTL, server.cacheTTL)
}
}
func TestStorageCacheEviction_CleanupOnStop(t *testing.T) {
tmpDir := t.TempDir()
// Create main DB
mainDB := filepath.Join(tmpDir, "main.db")
mainStore, err := sqlite.New(mainDB)
if err != nil {
t.Fatal(err)
}
defer mainStore.Close()
// Create server
socketPath := filepath.Join(tmpDir, "test.sock")
server := NewServer(socketPath, mainStore)
// Create test database and populate cache
dbPath := filepath.Join(tmpDir, "repo1", ".beads", "issues.db")
os.MkdirAll(filepath.Dir(dbPath), 0755)
store, err := sqlite.New(dbPath)
if err != nil {
t.Fatal(err)
}
store.Close()
req := &Request{Cwd: filepath.Join(tmpDir, "repo1")}
_, err = server.getStorageForRequest(req)
if err != nil {
t.Fatal(err)
}
// Verify cached
server.cacheMu.RLock()
cacheSize := len(server.storageCache)
server.cacheMu.RUnlock()
if cacheSize != 1 {
t.Fatalf("expected 1 cached entry, got %d", cacheSize)
}
// Stop server
if err := server.Stop(); err != nil {
t.Fatal(err)
}
// Verify cache was cleared
server.cacheMu.RLock()
cacheSize = len(server.storageCache)
server.cacheMu.RUnlock()
if cacheSize != 0 {
t.Errorf("expected cache to be cleared on stop, got %d entries", cacheSize)
}
}