package rpc import ( "encoding/json" "fmt" "net" "os" "sync" "sync/atomic" "time" "github.com/steveyegge/beads/internal/storage" ) // ServerVersion is the version of this RPC server // This should match the bd CLI version for proper compatibility checks // It's set dynamically by daemon.go from cmd/bd/version.go before starting the server var ServerVersion = "0.0.0" // Placeholder; overridden by daemon startup const ( statusUnhealthy = "unhealthy" ) // Server represents the RPC server that runs in the daemon type Server struct { socketPath string workspacePath string // Absolute path to workspace root dbPath string // Absolute path to database file storage storage.Storage // Default storage (for backward compat) listener net.Listener mu sync.RWMutex shutdown bool shutdownChan chan struct{} stopOnce sync.Once doneChan chan struct{} // closed when Start() cleanup is complete // Health and metrics startTime time.Time lastActivityTime atomic.Value // time.Time - last request timestamp metrics *Metrics // Connection limiting maxConns int activeConns int32 // atomic counter connSemaphore chan struct{} // Request timeout requestTimeout time.Duration // Ready channel signals when server is listening readyChan chan struct{} // Auto-import single-flight guard importInProgress atomic.Bool // Mutation events for event-driven daemon mutationChan chan MutationEvent droppedEvents atomic.Int64 // Counter for dropped mutation events // Recent mutations buffer for polling (circular buffer, max 100 events) recentMutations []MutationEvent recentMutationsMu sync.RWMutex maxMutationBuffer int } // Mutation event types const ( MutationCreate = "create" MutationUpdate = "update" MutationDelete = "delete" MutationComment = "comment" ) // MutationEvent represents a database mutation for event-driven sync type MutationEvent struct { Type string // One of: MutationCreate, MutationUpdate, MutationDelete, MutationComment IssueID string // e.g., "bd-42" Timestamp time.Time } // NewServer creates a new RPC server func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server { // Parse config from env vars maxConns := 100 // default if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" { var conns int if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 { maxConns = conns } } requestTimeout := 30 * time.Second // default if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" { if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 { requestTimeout = timeout } } mutationBufferSize := 512 // default (increased from 100 for better burst handling) if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" { var bufSize int if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 { mutationBufferSize = bufSize } } s := &Server{ socketPath: socketPath, workspacePath: workspacePath, dbPath: dbPath, storage: store, shutdownChan: make(chan struct{}), doneChan: make(chan struct{}), startTime: time.Now(), metrics: NewMetrics(), maxConns: maxConns, connSemaphore: make(chan struct{}, maxConns), requestTimeout: requestTimeout, readyChan: make(chan struct{}), mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer recentMutations: make([]MutationEvent, 0, 100), maxMutationBuffer: 100, } s.lastActivityTime.Store(time.Now()) return s } // emitMutation sends a mutation event to the daemon's event-driven loop. // Non-blocking: drops event if channel is full (sync will happen eventually). // Also stores in recent mutations buffer for polling. func (s *Server) emitMutation(eventType, issueID string) { event := MutationEvent{ Type: eventType, IssueID: issueID, Timestamp: time.Now(), } // Send to mutation channel for daemon select { case s.mutationChan <- event: // Event sent successfully default: // Channel full, increment dropped events counter s.droppedEvents.Add(1) } // Store in recent mutations buffer for polling s.recentMutationsMu.Lock() s.recentMutations = append(s.recentMutations, event) // Keep buffer size limited (circular buffer behavior) if len(s.recentMutations) > s.maxMutationBuffer { s.recentMutations = s.recentMutations[1:] } s.recentMutationsMu.Unlock() } // MutationChan returns the mutation event channel for the daemon to consume func (s *Server) MutationChan() <-chan MutationEvent { return s.mutationChan } // ResetDroppedEventsCount resets the dropped events counter and returns the previous value func (s *Server) ResetDroppedEventsCount() int64 { return s.droppedEvents.Swap(0) } // GetRecentMutations returns mutations since the given timestamp func (s *Server) GetRecentMutations(sinceMillis int64) []MutationEvent { s.recentMutationsMu.RLock() defer s.recentMutationsMu.RUnlock() var result []MutationEvent for _, m := range s.recentMutations { if m.Timestamp.UnixMilli() > sinceMillis { result = append(result, m) } } return result } // handleGetMutations handles the get_mutations RPC operation func (s *Server) handleGetMutations(req *Request) Response { var args GetMutationsArgs if err := json.Unmarshal(req.Args, &args); err != nil { return Response{ Success: false, Error: fmt.Sprintf("invalid arguments: %v", err), } } mutations := s.GetRecentMutations(args.Since) data, _ := json.Marshal(mutations) return Response{ Success: true, Data: data, } }