Files
beads/internal/rpc/server_core.go
Steve Yegge 0c737025b5 Split internal/rpc/server.go into 8 focused modules (bd-215)
Refactored monolithic 2238-line server.go into 8 files with clear responsibilities:
- server_core.go: Server type, NewServer (115 lines)
- server_lifecycle_conn.go: Start/Stop/connection handling (248 lines)
- server_cache_storage.go: Storage caching and eviction (286 lines)
- server_routing_validation_diagnostics.go: Request routing/validation (384 lines)
- server_issues_epics.go: Issue CRUD operations (506 lines)
- server_labels_deps_comments.go: Labels/deps/comments (199 lines)
- server_compact.go: Compaction operations (287 lines)
- server_export_import_auto.go: Export/import operations (293 lines)

Improvements:
- Replaced RWMutex.TryLock with atomic.Bool for portable single-flight guard
- Added default storage close in Stop() to prevent FD leaks
- All methods remain on *Server receiver (no behavior changes)
- Each file <510 LOC for better maintainability
- All tests pass, daemon verified working

Amp-Thread-ID: https://ampcode.com/threads/T-92d481ad-1bda-4ecd-bcf5-874a1889db30
Co-authored-by: Amp <amp@ampcode.com>
2025-10-27 21:14:34 -07:00

113 lines
3.2 KiB
Go

package rpc
import (
"fmt"
"net"
"os"
"sync"
"sync/atomic"
"time"
"github.com/steveyegge/beads/internal/storage"
)
// ServerVersion is the version of this RPC server
// This should match the bd CLI version for proper compatibility checks
// It's set dynamically by daemon.go from cmd/bd/version.go before starting the server
var ServerVersion = "0.0.0" // Placeholder; overridden by daemon startup
const (
statusUnhealthy = "unhealthy"
)
// Server represents the RPC server that runs in the daemon
type Server struct {
socketPath string
workspacePath string // Absolute path to workspace root
dbPath string // Absolute path to database file
storage storage.Storage // Default storage (for backward compat)
listener net.Listener
mu sync.RWMutex
shutdown bool
shutdownChan chan struct{}
stopOnce sync.Once
doneChan chan struct{} // closed when Start() cleanup is complete
// Per-request storage routing with eviction support
storageCache map[string]*StorageCacheEntry // repoRoot -> entry
cacheMu sync.RWMutex
maxCacheSize int
cacheTTL time.Duration
cleanupTicker *time.Ticker
// Health and metrics
startTime time.Time
lastActivityTime atomic.Value // time.Time - last request timestamp
cacheHits int64
cacheMisses int64
metrics *Metrics
// Connection limiting
maxConns int
activeConns int32 // atomic counter
connSemaphore chan struct{}
// Request timeout
requestTimeout time.Duration
// Ready channel signals when server is listening
readyChan chan struct{}
// Auto-import single-flight guard
importInProgress atomic.Bool
}
// NewServer creates a new RPC server
func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server {
// Parse config from env vars
maxCacheSize := 50 // default
if env := os.Getenv("BEADS_DAEMON_MAX_CACHE_SIZE"); env != "" {
// Parse as integer
var size int
if _, err := fmt.Sscanf(env, "%d", &size); err == nil && size > 0 {
maxCacheSize = size
}
}
cacheTTL := 30 * time.Minute // default
if env := os.Getenv("BEADS_DAEMON_CACHE_TTL"); env != "" {
if ttl, err := time.ParseDuration(env); err == nil && ttl > 0 {
cacheTTL = ttl
}
}
maxConns := 100 // default
if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" {
var conns int
if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 {
maxConns = conns
}
}
requestTimeout := 30 * time.Second // default
if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" {
if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 {
requestTimeout = timeout
}
}
s := &Server{
socketPath: socketPath,
workspacePath: workspacePath,
dbPath: dbPath,
storage: store,
storageCache: make(map[string]*StorageCacheEntry),
maxCacheSize: maxCacheSize,
cacheTTL: cacheTTL,
shutdownChan: make(chan struct{}),
doneChan: make(chan struct{}),
startTime: time.Now(),
metrics: NewMetrics(),
maxConns: maxConns,
connSemaphore: make(chan struct{}, maxConns),
requestTimeout: requestTimeout,
readyChan: make(chan struct{}),
}
s.lastActivityTime.Store(time.Now())
return s
}