- Add emitRichMutation() function for events with metadata - handleClose now emits MutationStatus with old/new status - handleUpdate detects status changes and emits MutationStatus - Add comprehensive tests for rich mutation events Also: - Add activity.go test coverage (bd-3jcw): - Tests for parseDurationString, filterEvents, formatEvent - Tests for all mutation type displays - Fix silent error handling in --follow mode (bd-csnr): - Track consecutive daemon failures - Show warning after 5 failures (rate-limited to 30s) - Show reconnection message on recovery 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
230 lines
7.2 KiB
Go
230 lines
7.2 KiB
Go
package rpc
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/steveyegge/beads/internal/storage"
|
|
)
|
|
|
|
// ServerVersion is the version of this RPC server
|
|
// This should match the bd CLI version for proper compatibility checks
|
|
// It's set dynamically by daemon.go from cmd/bd/version.go before starting the server
|
|
var ServerVersion = "0.0.0" // Placeholder; overridden by daemon startup
|
|
|
|
const (
|
|
statusUnhealthy = "unhealthy"
|
|
)
|
|
|
|
// Server represents the RPC server that runs in the daemon
|
|
type Server struct {
|
|
socketPath string
|
|
workspacePath string // Absolute path to workspace root
|
|
dbPath string // Absolute path to database file
|
|
storage storage.Storage // Default storage (for backward compat)
|
|
listener net.Listener
|
|
mu sync.RWMutex
|
|
shutdown bool
|
|
shutdownChan chan struct{}
|
|
stopOnce sync.Once
|
|
doneChan chan struct{} // closed when Start() cleanup is complete
|
|
// Health and metrics
|
|
startTime time.Time
|
|
lastActivityTime atomic.Value // time.Time - last request timestamp
|
|
metrics *Metrics
|
|
// Connection limiting
|
|
maxConns int
|
|
activeConns int32 // atomic counter
|
|
connSemaphore chan struct{}
|
|
// Request timeout
|
|
requestTimeout time.Duration
|
|
// Ready channel signals when server is listening
|
|
readyChan chan struct{}
|
|
// Auto-import single-flight guard
|
|
importInProgress atomic.Bool
|
|
// Mutation events for event-driven daemon
|
|
mutationChan chan MutationEvent
|
|
droppedEvents atomic.Int64 // Counter for dropped mutation events
|
|
// Recent mutations buffer for polling (circular buffer, max 100 events)
|
|
recentMutations []MutationEvent
|
|
recentMutationsMu sync.RWMutex
|
|
maxMutationBuffer int
|
|
// Daemon configuration (set via SetConfig after creation)
|
|
autoCommit bool
|
|
autoPush bool
|
|
autoPull bool
|
|
localMode bool
|
|
syncInterval string
|
|
daemonMode string
|
|
}
|
|
|
|
// Mutation event types
|
|
const (
|
|
MutationCreate = "create"
|
|
MutationUpdate = "update"
|
|
MutationDelete = "delete"
|
|
MutationComment = "comment"
|
|
// Molecule-specific event types for activity feed
|
|
MutationBonded = "bonded" // Molecule bonded to parent (dynamic bond)
|
|
MutationSquashed = "squashed" // Wisp squashed to digest
|
|
MutationBurned = "burned" // Wisp discarded without digest
|
|
MutationStatus = "status" // Status change (in_progress, completed, failed)
|
|
)
|
|
|
|
// MutationEvent represents a database mutation for event-driven sync
|
|
type MutationEvent struct {
|
|
Type string // One of the Mutation* constants
|
|
IssueID string // e.g., "bd-42"
|
|
Timestamp time.Time
|
|
// Optional metadata for richer events (used by status, bonded, etc.)
|
|
OldStatus string `json:"old_status,omitempty"` // Previous status (for status events)
|
|
NewStatus string `json:"new_status,omitempty"` // New status (for status events)
|
|
ParentID string `json:"parent_id,omitempty"` // Parent molecule (for bonded events)
|
|
StepCount int `json:"step_count,omitempty"` // Number of steps (for bonded events)
|
|
}
|
|
|
|
// NewServer creates a new RPC server
|
|
func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server {
|
|
// Parse config from env vars
|
|
maxConns := 100 // default
|
|
if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" {
|
|
var conns int
|
|
if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 {
|
|
maxConns = conns
|
|
}
|
|
}
|
|
|
|
requestTimeout := 30 * time.Second // default
|
|
if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" {
|
|
if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 {
|
|
requestTimeout = timeout
|
|
}
|
|
}
|
|
|
|
mutationBufferSize := 512 // default (increased from 100 for better burst handling)
|
|
if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" {
|
|
var bufSize int
|
|
if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 {
|
|
mutationBufferSize = bufSize
|
|
}
|
|
}
|
|
|
|
s := &Server{
|
|
socketPath: socketPath,
|
|
workspacePath: workspacePath,
|
|
dbPath: dbPath,
|
|
storage: store,
|
|
shutdownChan: make(chan struct{}),
|
|
doneChan: make(chan struct{}),
|
|
startTime: time.Now(),
|
|
metrics: NewMetrics(),
|
|
maxConns: maxConns,
|
|
connSemaphore: make(chan struct{}, maxConns),
|
|
requestTimeout: requestTimeout,
|
|
readyChan: make(chan struct{}),
|
|
mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer
|
|
recentMutations: make([]MutationEvent, 0, 100),
|
|
maxMutationBuffer: 100,
|
|
}
|
|
s.lastActivityTime.Store(time.Now())
|
|
return s
|
|
}
|
|
|
|
// emitMutation sends a mutation event to the daemon's event-driven loop.
|
|
// Non-blocking: drops event if channel is full (sync will happen eventually).
|
|
// Also stores in recent mutations buffer for polling.
|
|
func (s *Server) emitMutation(eventType, issueID string) {
|
|
s.emitRichMutation(MutationEvent{
|
|
Type: eventType,
|
|
IssueID: issueID,
|
|
})
|
|
}
|
|
|
|
// emitRichMutation sends a pre-built mutation event with optional metadata.
|
|
// Use this for events that include additional context (status changes, bonded events, etc.)
|
|
// Non-blocking: drops event if channel is full (sync will happen eventually).
|
|
func (s *Server) emitRichMutation(event MutationEvent) {
|
|
// Always set timestamp if not provided
|
|
if event.Timestamp.IsZero() {
|
|
event.Timestamp = time.Now()
|
|
}
|
|
|
|
// Send to mutation channel for daemon
|
|
select {
|
|
case s.mutationChan <- event:
|
|
// Event sent successfully
|
|
default:
|
|
// Channel full, increment dropped events counter
|
|
s.droppedEvents.Add(1)
|
|
}
|
|
|
|
// Store in recent mutations buffer for polling
|
|
s.recentMutationsMu.Lock()
|
|
s.recentMutations = append(s.recentMutations, event)
|
|
// Keep buffer size limited (circular buffer behavior)
|
|
if len(s.recentMutations) > s.maxMutationBuffer {
|
|
s.recentMutations = s.recentMutations[1:]
|
|
}
|
|
s.recentMutationsMu.Unlock()
|
|
}
|
|
|
|
// MutationChan returns the mutation event channel for the daemon to consume
|
|
func (s *Server) MutationChan() <-chan MutationEvent {
|
|
return s.mutationChan
|
|
}
|
|
|
|
// SetConfig sets the daemon configuration for status reporting
|
|
func (s *Server) SetConfig(autoCommit, autoPush, autoPull, localMode bool, syncInterval, daemonMode string) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.autoCommit = autoCommit
|
|
s.autoPush = autoPush
|
|
s.autoPull = autoPull
|
|
s.localMode = localMode
|
|
s.syncInterval = syncInterval
|
|
s.daemonMode = daemonMode
|
|
}
|
|
|
|
// ResetDroppedEventsCount resets the dropped events counter and returns the previous value
|
|
func (s *Server) ResetDroppedEventsCount() int64 {
|
|
return s.droppedEvents.Swap(0)
|
|
}
|
|
|
|
// GetRecentMutations returns mutations since the given timestamp
|
|
func (s *Server) GetRecentMutations(sinceMillis int64) []MutationEvent {
|
|
s.recentMutationsMu.RLock()
|
|
defer s.recentMutationsMu.RUnlock()
|
|
|
|
var result []MutationEvent
|
|
for _, m := range s.recentMutations {
|
|
if m.Timestamp.UnixMilli() > sinceMillis {
|
|
result = append(result, m)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
// handleGetMutations handles the get_mutations RPC operation
|
|
func (s *Server) handleGetMutations(req *Request) Response {
|
|
var args GetMutationsArgs
|
|
if err := json.Unmarshal(req.Args, &args); err != nil {
|
|
return Response{
|
|
Success: false,
|
|
Error: fmt.Sprintf("invalid arguments: %v", err),
|
|
}
|
|
}
|
|
|
|
mutations := s.GetRecentMutations(args.Since)
|
|
data, _ := json.Marshal(mutations)
|
|
|
|
return Response{
|
|
Success: true,
|
|
Data: data,
|
|
}
|
|
}
|