From e2bb4311f1480eccdec40ace12ddf1362a7565c6 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Fri, 31 Oct 2025 19:11:49 -0700 Subject: [PATCH] Improve mutation channel robustness - Add event type constants (MutationCreate, MutationUpdate, MutationDelete, MutationComment) - Make buffer size configurable via BEADS_MUTATION_BUFFER (default 512, up from 100) - Add defensive closed-channel handling in event loop consumer - Add 1s dropped-events ticker (down from 60s) for faster recovery - Verify mutationChan is never closed (prevents panic) Oracle review findings addressed: - Eliminates panic risk from send-on-closed-channel - Reduces worst-case recovery latency from 60s to 1s - Increases buffer capacity for better burst handling - Type-safe event constants prevent string typos Related: bd-36320a04, bd-1f4086c5 --- cmd/bd/daemon_event_loop.go | 24 +++++++++++++++------ internal/rpc/server_core.go | 20 +++++++++++++++-- internal/rpc/server_issues_epics.go | 6 +++--- internal/rpc/server_labels_deps_comments.go | 6 +++--- 4 files changed, 41 insertions(+), 15 deletions(-) diff --git a/cmd/bd/daemon_event_loop.go b/cmd/bd/daemon_event_loop.go index da7abf42..9c33b18c 100644 --- a/cmd/bd/daemon_event_loop.go +++ b/cmd/bd/daemon_event_loop.go @@ -60,7 +60,12 @@ func runEventDrivenLoop( go func() { for { select { - case event := <-mutationChan: + case event, ok := <-mutationChan: + if !ok { + // Channel closed (should never happen, but handle defensively) + log.log("Mutation channel closed; exiting listener") + return + } log.log("Mutation detected: %s %s", event.Type, event.IssueID) exportDebouncer.Trigger() @@ -70,23 +75,28 @@ func runEventDrivenLoop( } }() - // Optional: Periodic health check and dropped events safety net + // Periodic health check healthTicker := time.NewTicker(60 * time.Second) defer healthTicker.Stop() + // Dropped events safety net (faster recovery than health check) + droppedEventsTicker := time.NewTicker(1 * time.Second) + defer droppedEventsTicker.Stop() + for { select { - case <-healthTicker.C: - // Periodic health validation (not sync) - checkDaemonHealth(ctx, store, log) - - // Safety net: check for dropped mutation events + case <-droppedEventsTicker.C: + // Check for dropped mutation events every second dropped := server.ResetDroppedEventsCount() if dropped > 0 { log.log("WARNING: %d mutation events were dropped, triggering export", dropped) exportDebouncer.Trigger() } + case <-healthTicker.C: + // Periodic health validation (not sync) + checkDaemonHealth(ctx, store, log) + case sig := <-sigChan: if isReloadSignal(sig) { log.log("Received reload signal, ignoring") diff --git a/internal/rpc/server_core.go b/internal/rpc/server_core.go index c1f9a57d..7c059ab2 100644 --- a/internal/rpc/server_core.go +++ b/internal/rpc/server_core.go @@ -51,9 +51,17 @@ type Server struct { droppedEvents atomic.Int64 // Counter for dropped mutation events } +// Mutation event types +const ( + MutationCreate = "create" + MutationUpdate = "update" + MutationDelete = "delete" + MutationComment = "comment" +) + // MutationEvent represents a database mutation for event-driven sync type MutationEvent struct { - Type string // "create", "update", "delete", "comment" + Type string // One of: MutationCreate, MutationUpdate, MutationDelete, MutationComment IssueID string // e.g., "bd-42" Timestamp time.Time } @@ -76,6 +84,14 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d } } + mutationBufferSize := 512 // default (increased from 100 for better burst handling) + if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" { + var bufSize int + if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 { + mutationBufferSize = bufSize + } + } + s := &Server{ socketPath: socketPath, workspacePath: workspacePath, @@ -89,7 +105,7 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d connSemaphore: make(chan struct{}, maxConns), requestTimeout: requestTimeout, readyChan: make(chan struct{}), - mutationChan: make(chan MutationEvent, 100), // Buffered to avoid blocking + mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer } s.lastActivityTime.Store(time.Now()) return s diff --git a/internal/rpc/server_issues_epics.go b/internal/rpc/server_issues_epics.go index 42e0f253..6bf6bb72 100644 --- a/internal/rpc/server_issues_epics.go +++ b/internal/rpc/server_issues_epics.go @@ -161,7 +161,7 @@ func (s *Server) handleCreate(req *Request) Response { } // Emit mutation event for event-driven daemon - s.emitMutation("create", issue.ID) + s.emitMutation(MutationCreate, issue.ID) data, _ := json.Marshal(issue) return Response{ @@ -195,7 +195,7 @@ func (s *Server) handleUpdate(req *Request) Response { } // Emit mutation event for event-driven daemon - s.emitMutation("update", updateArgs.ID) + s.emitMutation(MutationUpdate, updateArgs.ID) issue, err := store.GetIssue(ctx, updateArgs.ID) if err != nil { @@ -232,7 +232,7 @@ func (s *Server) handleClose(req *Request) Response { } // Emit mutation event for event-driven daemon - s.emitMutation("update", closeArgs.ID) + s.emitMutation(MutationUpdate, closeArgs.ID) issue, _ := store.GetIssue(ctx, closeArgs.ID) data, _ := json.Marshal(issue) diff --git a/internal/rpc/server_labels_deps_comments.go b/internal/rpc/server_labels_deps_comments.go index 33b4cf93..345ed280 100644 --- a/internal/rpc/server_labels_deps_comments.go +++ b/internal/rpc/server_labels_deps_comments.go @@ -35,7 +35,7 @@ func (s *Server) handleDepAdd(req *Request) Response { } // Emit mutation event for event-driven daemon - s.emitMutation("update", depArgs.FromID) + s.emitMutation(MutationUpdate, depArgs.FromID) return Response{Success: true} } @@ -61,7 +61,7 @@ func (s *Server) handleSimpleStoreOp(req *Request, argsPtr interface{}, argDesc } // Emit mutation event for event-driven daemon - s.emitMutation("update", issueID) + s.emitMutation(MutationUpdate, issueID) return Response{Success: true} } @@ -135,7 +135,7 @@ func (s *Server) handleCommentAdd(req *Request) Response { } // Emit mutation event for event-driven daemon - s.emitMutation("comment", commentArgs.ID) + s.emitMutation(MutationComment, commentArgs.ID) data, _ := json.Marshal(comment) return Response{