Files
beads/internal/rpc/server_core.go
Steve Yegge e2bb4311f1 Improve mutation channel robustness
- Add event type constants (MutationCreate, MutationUpdate, MutationDelete, MutationComment)
- Make buffer size configurable via BEADS_MUTATION_BUFFER (default 512, up from 100)
- Add defensive closed-channel handling in event loop consumer
- Add 1s dropped-events ticker (down from 60s) for faster recovery
- Verify mutationChan is never closed (prevents panic)

Oracle review findings addressed:
- Eliminates panic risk from send-on-closed-channel
- Reduces worst-case recovery latency from 60s to 1s
- Increases buffer capacity for better burst handling
- Type-safe event constants prevent string typos

Related: bd-36320a04, bd-1f4086c5
2025-10-31 19:12:02 -07:00

139 lines
4.1 KiB
Go

package rpc
import (
"fmt"
"net"
"os"
"sync"
"sync/atomic"
"time"
"github.com/steveyegge/beads/internal/storage"
)
// ServerVersion is the version of this RPC server
// This should match the bd CLI version for proper compatibility checks
// It's set dynamically by daemon.go from cmd/bd/version.go before starting the server
var ServerVersion = "0.0.0" // Placeholder; overridden by daemon startup
const (
statusUnhealthy = "unhealthy"
)
// Server represents the RPC server that runs in the daemon
type Server struct {
socketPath string
workspacePath string // Absolute path to workspace root
dbPath string // Absolute path to database file
storage storage.Storage // Default storage (for backward compat)
listener net.Listener
mu sync.RWMutex
shutdown bool
shutdownChan chan struct{}
stopOnce sync.Once
doneChan chan struct{} // closed when Start() cleanup is complete
// Health and metrics
startTime time.Time
lastActivityTime atomic.Value // time.Time - last request timestamp
metrics *Metrics
// Connection limiting
maxConns int
activeConns int32 // atomic counter
connSemaphore chan struct{}
// Request timeout
requestTimeout time.Duration
// Ready channel signals when server is listening
readyChan chan struct{}
// Auto-import single-flight guard
importInProgress atomic.Bool
// Mutation events for event-driven daemon
mutationChan chan MutationEvent
droppedEvents atomic.Int64 // Counter for dropped mutation events
}
// Mutation event types
const (
MutationCreate = "create"
MutationUpdate = "update"
MutationDelete = "delete"
MutationComment = "comment"
)
// MutationEvent represents a database mutation for event-driven sync
type MutationEvent struct {
Type string // One of: MutationCreate, MutationUpdate, MutationDelete, MutationComment
IssueID string // e.g., "bd-42"
Timestamp time.Time
}
// NewServer creates a new RPC server
func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server {
// Parse config from env vars
maxConns := 100 // default
if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" {
var conns int
if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 {
maxConns = conns
}
}
requestTimeout := 30 * time.Second // default
if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" {
if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 {
requestTimeout = timeout
}
}
mutationBufferSize := 512 // default (increased from 100 for better burst handling)
if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" {
var bufSize int
if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 {
mutationBufferSize = bufSize
}
}
s := &Server{
socketPath: socketPath,
workspacePath: workspacePath,
dbPath: dbPath,
storage: store,
shutdownChan: make(chan struct{}),
doneChan: make(chan struct{}),
startTime: time.Now(),
metrics: NewMetrics(),
maxConns: maxConns,
connSemaphore: make(chan struct{}, maxConns),
requestTimeout: requestTimeout,
readyChan: make(chan struct{}),
mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer
}
s.lastActivityTime.Store(time.Now())
return s
}
// emitMutation sends a mutation event to the daemon's event-driven loop.
// Non-blocking: drops event if channel is full (sync will happen eventually).
func (s *Server) emitMutation(eventType, issueID string) {
select {
case s.mutationChan <- MutationEvent{
Type: eventType,
IssueID: issueID,
Timestamp: time.Now(),
}:
// Event sent successfully
default:
// Channel full, increment dropped events counter
s.droppedEvents.Add(1)
}
}
// MutationChan returns the mutation event channel for the daemon to consume
func (s *Server) MutationChan() <-chan MutationEvent {
return s.mutationChan
}
// ResetDroppedEventsCount resets the dropped events counter and returns the previous value
func (s *Server) ResetDroppedEventsCount() int64 {
return s.droppedEvents.Swap(0)
}