Improve mutation channel robustness
- Add event type constants (MutationCreate, MutationUpdate, MutationDelete, MutationComment) - Make buffer size configurable via BEADS_MUTATION_BUFFER (default 512, up from 100) - Add defensive closed-channel handling in event loop consumer - Add 1s dropped-events ticker (down from 60s) for faster recovery - Verify mutationChan is never closed (prevents panic) Oracle review findings addressed: - Eliminates panic risk from send-on-closed-channel - Reduces worst-case recovery latency from 60s to 1s - Increases buffer capacity for better burst handling - Type-safe event constants prevent string typos Related: bd-36320a04, bd-1f4086c5
This commit is contained in:
@@ -60,7 +60,12 @@ func runEventDrivenLoop(
|
|||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-mutationChan:
|
case event, ok := <-mutationChan:
|
||||||
|
if !ok {
|
||||||
|
// Channel closed (should never happen, but handle defensively)
|
||||||
|
log.log("Mutation channel closed; exiting listener")
|
||||||
|
return
|
||||||
|
}
|
||||||
log.log("Mutation detected: %s %s", event.Type, event.IssueID)
|
log.log("Mutation detected: %s %s", event.Type, event.IssueID)
|
||||||
exportDebouncer.Trigger()
|
exportDebouncer.Trigger()
|
||||||
|
|
||||||
@@ -70,23 +75,28 @@ func runEventDrivenLoop(
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Optional: Periodic health check and dropped events safety net
|
// Periodic health check
|
||||||
healthTicker := time.NewTicker(60 * time.Second)
|
healthTicker := time.NewTicker(60 * time.Second)
|
||||||
defer healthTicker.Stop()
|
defer healthTicker.Stop()
|
||||||
|
|
||||||
|
// Dropped events safety net (faster recovery than health check)
|
||||||
|
droppedEventsTicker := time.NewTicker(1 * time.Second)
|
||||||
|
defer droppedEventsTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-healthTicker.C:
|
case <-droppedEventsTicker.C:
|
||||||
// Periodic health validation (not sync)
|
// Check for dropped mutation events every second
|
||||||
checkDaemonHealth(ctx, store, log)
|
|
||||||
|
|
||||||
// Safety net: check for dropped mutation events
|
|
||||||
dropped := server.ResetDroppedEventsCount()
|
dropped := server.ResetDroppedEventsCount()
|
||||||
if dropped > 0 {
|
if dropped > 0 {
|
||||||
log.log("WARNING: %d mutation events were dropped, triggering export", dropped)
|
log.log("WARNING: %d mutation events were dropped, triggering export", dropped)
|
||||||
exportDebouncer.Trigger()
|
exportDebouncer.Trigger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case <-healthTicker.C:
|
||||||
|
// Periodic health validation (not sync)
|
||||||
|
checkDaemonHealth(ctx, store, log)
|
||||||
|
|
||||||
case sig := <-sigChan:
|
case sig := <-sigChan:
|
||||||
if isReloadSignal(sig) {
|
if isReloadSignal(sig) {
|
||||||
log.log("Received reload signal, ignoring")
|
log.log("Received reload signal, ignoring")
|
||||||
|
|||||||
@@ -51,9 +51,17 @@ type Server struct {
|
|||||||
droppedEvents atomic.Int64 // Counter for dropped mutation events
|
droppedEvents atomic.Int64 // Counter for dropped mutation events
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mutation event types
|
||||||
|
const (
|
||||||
|
MutationCreate = "create"
|
||||||
|
MutationUpdate = "update"
|
||||||
|
MutationDelete = "delete"
|
||||||
|
MutationComment = "comment"
|
||||||
|
)
|
||||||
|
|
||||||
// MutationEvent represents a database mutation for event-driven sync
|
// MutationEvent represents a database mutation for event-driven sync
|
||||||
type MutationEvent struct {
|
type MutationEvent struct {
|
||||||
Type string // "create", "update", "delete", "comment"
|
Type string // One of: MutationCreate, MutationUpdate, MutationDelete, MutationComment
|
||||||
IssueID string // e.g., "bd-42"
|
IssueID string // e.g., "bd-42"
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
}
|
}
|
||||||
@@ -76,6 +84,14 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mutationBufferSize := 512 // default (increased from 100 for better burst handling)
|
||||||
|
if env := os.Getenv("BEADS_MUTATION_BUFFER"); env != "" {
|
||||||
|
var bufSize int
|
||||||
|
if _, err := fmt.Sscanf(env, "%d", &bufSize); err == nil && bufSize > 0 {
|
||||||
|
mutationBufferSize = bufSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
socketPath: socketPath,
|
socketPath: socketPath,
|
||||||
workspacePath: workspacePath,
|
workspacePath: workspacePath,
|
||||||
@@ -89,7 +105,7 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d
|
|||||||
connSemaphore: make(chan struct{}, maxConns),
|
connSemaphore: make(chan struct{}, maxConns),
|
||||||
requestTimeout: requestTimeout,
|
requestTimeout: requestTimeout,
|
||||||
readyChan: make(chan struct{}),
|
readyChan: make(chan struct{}),
|
||||||
mutationChan: make(chan MutationEvent, 100), // Buffered to avoid blocking
|
mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer
|
||||||
}
|
}
|
||||||
s.lastActivityTime.Store(time.Now())
|
s.lastActivityTime.Store(time.Now())
|
||||||
return s
|
return s
|
||||||
|
|||||||
@@ -161,7 +161,7 @@ func (s *Server) handleCreate(req *Request) Response {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Emit mutation event for event-driven daemon
|
// Emit mutation event for event-driven daemon
|
||||||
s.emitMutation("create", issue.ID)
|
s.emitMutation(MutationCreate, issue.ID)
|
||||||
|
|
||||||
data, _ := json.Marshal(issue)
|
data, _ := json.Marshal(issue)
|
||||||
return Response{
|
return Response{
|
||||||
@@ -195,7 +195,7 @@ func (s *Server) handleUpdate(req *Request) Response {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Emit mutation event for event-driven daemon
|
// Emit mutation event for event-driven daemon
|
||||||
s.emitMutation("update", updateArgs.ID)
|
s.emitMutation(MutationUpdate, updateArgs.ID)
|
||||||
|
|
||||||
issue, err := store.GetIssue(ctx, updateArgs.ID)
|
issue, err := store.GetIssue(ctx, updateArgs.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -232,7 +232,7 @@ func (s *Server) handleClose(req *Request) Response {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Emit mutation event for event-driven daemon
|
// Emit mutation event for event-driven daemon
|
||||||
s.emitMutation("update", closeArgs.ID)
|
s.emitMutation(MutationUpdate, closeArgs.ID)
|
||||||
|
|
||||||
issue, _ := store.GetIssue(ctx, closeArgs.ID)
|
issue, _ := store.GetIssue(ctx, closeArgs.ID)
|
||||||
data, _ := json.Marshal(issue)
|
data, _ := json.Marshal(issue)
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ func (s *Server) handleDepAdd(req *Request) Response {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Emit mutation event for event-driven daemon
|
// Emit mutation event for event-driven daemon
|
||||||
s.emitMutation("update", depArgs.FromID)
|
s.emitMutation(MutationUpdate, depArgs.FromID)
|
||||||
|
|
||||||
return Response{Success: true}
|
return Response{Success: true}
|
||||||
}
|
}
|
||||||
@@ -61,7 +61,7 @@ func (s *Server) handleSimpleStoreOp(req *Request, argsPtr interface{}, argDesc
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Emit mutation event for event-driven daemon
|
// Emit mutation event for event-driven daemon
|
||||||
s.emitMutation("update", issueID)
|
s.emitMutation(MutationUpdate, issueID)
|
||||||
|
|
||||||
return Response{Success: true}
|
return Response{Success: true}
|
||||||
}
|
}
|
||||||
@@ -135,7 +135,7 @@ func (s *Server) handleCommentAdd(req *Request) Response {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Emit mutation event for event-driven daemon
|
// Emit mutation event for event-driven daemon
|
||||||
s.emitMutation("comment", commentArgs.ID)
|
s.emitMutation(MutationComment, commentArgs.ID)
|
||||||
|
|
||||||
data, _ := json.Marshal(comment)
|
data, _ := json.Marshal(comment)
|
||||||
return Response{
|
return Response{
|
||||||
|
|||||||
Reference in New Issue
Block a user