Files
beads/internal/rpc/server_core.go
Steve Yegge 3405f0c684 feat(rpc): enrich MutationEvent with Title and Assignee fields
Add Title and Assignee fields to MutationEvent struct so activity feeds
can display meaningful context without extra lookups. Updated emitMutation
signature to accept these values and modified all callers:

- Create: passes issue.Title and issue.Assignee directly
- Update/Close: moved emitMutation after GetIssue to access enriched data
- Delete: uses existing issue lookup before deletion
- Dep/Label/Comment ops: passes empty strings (would require extra lookup)

Fixes bd-gqxd

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-23 17:09:56 -08:00

235 lines
7.5 KiB
Go

package rpc
import (
"encoding/json"
"fmt"
"net"
"os"
"sync"
"sync/atomic"
"time"
"github.com/steveyegge/beads/internal/storage"
)
// ServerVersion is the version of this RPC server
// This should match the bd CLI version for proper compatibility checks
// It's set dynamically by daemon.go from cmd/bd/version.go before starting the server
var ServerVersion = "0.0.0" // Placeholder; overridden by daemon startup
const (
statusUnhealthy = "unhealthy"
)
// Server represents the RPC server that runs in the daemon
type Server struct {
socketPath string
workspacePath string // Absolute path to workspace root
dbPath string // Absolute path to database file
storage storage.Storage // Default storage (for backward compat)
listener net.Listener
mu sync.RWMutex
shutdown bool
shutdownChan chan struct{}
stopOnce sync.Once
doneChan chan struct{} // closed when Start() cleanup is complete
// Health and metrics
startTime time.Time
lastActivityTime atomic.Value // time.Time - last request timestamp
metrics *Metrics
// Connection limiting
maxConns int
activeConns int32 // atomic counter
connSemaphore chan struct{}
// Request timeout
requestTimeout time.Duration
// Ready channel signals when server is listening
readyChan chan struct{}
// Auto-import single-flight guard
importInProgress atomic.Bool
// Mutation events for event-driven daemon
mutationChan chan MutationEvent
droppedEvents atomic.Int64 // Counter for dropped mutation events
// Recent mutations buffer for polling (circular buffer, max 100 events)
recentMutations []MutationEvent
recentMutationsMu sync.RWMutex
maxMutationBuffer int
// Daemon configuration (set via SetConfig after creation)
autoCommit bool
autoPush bool
autoPull bool
localMode bool
syncInterval string
daemonMode string
}
// Mutation event types
const (
MutationCreate = "create"
MutationUpdate = "update"
MutationDelete = "delete"
MutationComment = "comment"
// Molecule-specific event types for activity feed
MutationBonded = "bonded" // Molecule bonded to parent (dynamic bond)
MutationSquashed = "squashed" // Wisp squashed to digest
MutationBurned = "burned" // Wisp discarded without digest
MutationStatus = "status" // Status change (in_progress, completed, failed)
)
// MutationEvent represents a database mutation for event-driven sync
type MutationEvent struct {
Type string // One of the Mutation* constants
IssueID string // e.g., "bd-42"
Title string // Issue title for display context (may be empty for some operations)
Assignee string // Issue assignee for display context (may be empty)
Timestamp time.Time
// Optional metadata for richer events (used by status, bonded, etc.)
OldStatus string `json:"old_status,omitempty"` // Previous status (for status events)
NewStatus string `json:"new_status,omitempty"` // New status (for status events)
ParentID string `json:"parent_id,omitempty"` // Parent molecule (for bonded events)
StepCount int `json:"step_count,omitempty"` // Number of steps (for bonded events)
}
// NewServer creates a new RPC server
func NewServer(socketPath string, store storage.Storage, workspacePath string, dbPath string) *Server {
// Parse config from env vars
maxConns := 100 // default
if env := os.Getenv("BEADS_DAEMON_MAX_CONNS"); env != "" {
var conns int
if _, err := fmt.Sscanf(env, "%d", &conns); err == nil && conns > 0 {
maxConns = conns
}
}
requestTimeout := 30 * time.Second // default
if env := os.Getenv("BEADS_DAEMON_REQUEST_TIMEOUT"); env != "" {
if timeout, err := time.ParseDuration(env); err == nil && timeout > 0 {
requestTimeout = timeout
}
}
mutationBufferSize := 512 // default (increased from 100 for better burst handling)
if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" {
var bufSize int
if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 {
mutationBufferSize = bufSize
}
}
s := &Server{
socketPath: socketPath,
workspacePath: workspacePath,
dbPath: dbPath,
storage: store,
shutdownChan: make(chan struct{}),
doneChan: make(chan struct{}),
startTime: time.Now(),
metrics: NewMetrics(),
maxConns: maxConns,
connSemaphore: make(chan struct{}, maxConns),
requestTimeout: requestTimeout,
readyChan: make(chan struct{}),
mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer
recentMutations: make([]MutationEvent, 0, 100),
maxMutationBuffer: 100,
}
s.lastActivityTime.Store(time.Now())
return s
}
// emitMutation sends a mutation event to the daemon's event-driven loop.
// Non-blocking: drops event if channel is full (sync will happen eventually).
// Also stores in recent mutations buffer for polling.
// Title and assignee provide context for activity feeds; pass empty strings if unknown.
func (s *Server) emitMutation(eventType, issueID, title, assignee string) {
s.emitRichMutation(MutationEvent{
Type: eventType,
IssueID: issueID,
Title: title,
Assignee: assignee,
})
}
// emitRichMutation sends a pre-built mutation event with optional metadata.
// Use this for events that include additional context (status changes, bonded events, etc.)
// Non-blocking: drops event if channel is full (sync will happen eventually).
func (s *Server) emitRichMutation(event MutationEvent) {
// Always set timestamp if not provided
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
// Send to mutation channel for daemon
select {
case s.mutationChan <- event:
// Event sent successfully
default:
// Channel full, increment dropped events counter
s.droppedEvents.Add(1)
}
// Store in recent mutations buffer for polling
s.recentMutationsMu.Lock()
s.recentMutations = append(s.recentMutations, event)
// Keep buffer size limited (circular buffer behavior)
if len(s.recentMutations) > s.maxMutationBuffer {
s.recentMutations = s.recentMutations[1:]
}
s.recentMutationsMu.Unlock()
}
// MutationChan returns the mutation event channel for the daemon to consume
func (s *Server) MutationChan() <-chan MutationEvent {
return s.mutationChan
}
// SetConfig sets the daemon configuration for status reporting
func (s *Server) SetConfig(autoCommit, autoPush, autoPull, localMode bool, syncInterval, daemonMode string) {
s.mu.Lock()
defer s.mu.Unlock()
s.autoCommit = autoCommit
s.autoPush = autoPush
s.autoPull = autoPull
s.localMode = localMode
s.syncInterval = syncInterval
s.daemonMode = daemonMode
}
// ResetDroppedEventsCount resets the dropped events counter and returns the previous value
func (s *Server) ResetDroppedEventsCount() int64 {
return s.droppedEvents.Swap(0)
}
// GetRecentMutations returns mutations since the given timestamp
func (s *Server) GetRecentMutations(sinceMillis int64) []MutationEvent {
s.recentMutationsMu.RLock()
defer s.recentMutationsMu.RUnlock()
var result []MutationEvent
for _, m := range s.recentMutations {
if m.Timestamp.UnixMilli() > sinceMillis {
result = append(result, m)
}
}
return result
}
// handleGetMutations handles the get_mutations RPC operation
func (s *Server) handleGetMutations(req *Request) Response {
var args GetMutationsArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
return Response{
Success: false,
Error: fmt.Sprintf("invalid arguments: %v", err),
}
}
mutations := s.GetRecentMutations(args.Since)
data, _ := json.Marshal(mutations)
return Response{
Success: true,
Data: data,
}
}