Implementing an RPC monitoring solution with a web-ui as implementation example. (#244)
* bd sync: 2025-10-30 12:12:27 * Working on frontend * bd sync: 2025-11-06 16:55:55 * feat: finish bd monitor human viewer * Merge conflicts resolved and added tests * bd sync: 2025-11-06 17:23:41 * bd sync: 2025-11-06 17:34:52 * feat: Add reload button and multiselect status filter to monitor - Changed status filter from single select to multiselect with 'Open' selected by default - Added reload button with visual feedback (hover/active states) - Updated filterIssues() to handle multiple selected statuses - Added reloadData() function that reloads both stats and issues - Improved responsive design for mobile devices - Filter controls now use flexbox layout with better spacing * fix: Update monitor statistics to show Total, In Progress, Open, Closed - Replaced 'Ready to Work' stat with 'In Progress' stat - Reordered stats to show logical progression: Total -> In Progress -> Open -> Closed - Updated loadStats() to fetch in-progress count from stats API - Removed unnecessary separate API call for ready count * fix: Correct API field names in monitor stats JavaScript The JavaScript was using incorrect field names (stats.total, stats.by_status) that don't match the actual types.Statistics struct which uses flat fields with underscores (total_issues, in_progress_issues, etc). Fixed by updating loadStats() to use correct field names: - stats.total -> stats.total_issues - stats.by_status?.['in-progress'] -> stats.in_progress_issues - stats.by_status?.open -> stats.open_issues - stats.by_status?.closed -> stats.closed_issues Fixes beads-9 * bd sync: 2025-11-06 17:51:24 * bd sync: 2025-11-06 17:56:09 * fix: Make monitor require daemon to prevent SQLite locking Implemented Option 1 from beads-eel: monitor now requires daemon and never opens direct SQLite connection. Changes: - Added 'monitor' to noDbCommands list in main.go to skip normal DB initialization - Added validateDaemonForMonitor() PreRun function that: - Finds database path using beads.FindDatabasePath() - Validates daemon is running and healthy - Fails gracefully with clear error message if no daemon - Only uses RPC connection, never opens SQLite directly Benefits: - Eliminates SQLite locking conflicts between monitor and daemon - Users can now close/update issues via CLI while monitor runs - Clear error messages guide users to start daemon first Fixes beads-eel * bd sync: 2025-11-06 18:03:50 * docs: Add bd daemons restart subcommand documentation Added documentation for the 'bd daemons restart' subcommand across all documentation files: - commands/daemons.md: Added full restart subcommand section with synopsis, description, arguments, flags, and examples - README.md: Added restart examples to daemon management section - AGENTS.md: Added restart examples with --json flag for agents The restart command gracefully stops and starts a specific daemon by workspace path or PID, useful after upgrading bd or when a daemon needs refreshing. Fixes beads-11 * bd sync: 2025-11-06 18:13:16 * Separated the web ui from the general monitoring functionality --------- Co-authored-by: Steve Yegge <stevey@sourcegraph.com>
This commit is contained in:
@@ -267,6 +267,11 @@ func (c *Client) Stats() (*Response, error) {
|
||||
return c.Execute(OpStats, nil)
|
||||
}
|
||||
|
||||
// GetMutations retrieves recent mutations from the daemon
|
||||
func (c *Client) GetMutations(args *GetMutationsArgs) (*Response, error) {
|
||||
return c.Execute(OpGetMutations, args)
|
||||
}
|
||||
|
||||
// AddDependency adds a dependency via the daemon
|
||||
func (c *Client) AddDependency(args *DepAddArgs) (*Response, error) {
|
||||
return c.Execute(OpDepAdd, args)
|
||||
|
||||
@@ -33,6 +33,7 @@ const (
|
||||
OpExport = "export"
|
||||
OpImport = "import"
|
||||
OpEpicStatus = "epic_status"
|
||||
OpGetMutations = "get_mutations"
|
||||
OpShutdown = "shutdown"
|
||||
)
|
||||
|
||||
@@ -315,3 +316,8 @@ type ExportArgs struct {
|
||||
type ImportArgs struct {
|
||||
JSONLPath string `json:"jsonl_path"` // Path to import JSONL file
|
||||
}
|
||||
|
||||
// GetMutationsArgs represents arguments for retrieving recent mutations
|
||||
type GetMutationsArgs struct {
|
||||
Since int64 `json:"since"` // Unix timestamp in milliseconds (0 for all recent)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
@@ -49,6 +50,10 @@ type Server struct {
|
||||
// Mutation events for event-driven daemon
|
||||
mutationChan chan MutationEvent
|
||||
droppedEvents atomic.Int64 // Counter for dropped mutation events
|
||||
// Recent mutations buffer for polling (circular buffer, max 100 events)
|
||||
recentMutations []MutationEvent
|
||||
recentMutationsMu sync.RWMutex
|
||||
maxMutationBuffer int
|
||||
}
|
||||
|
||||
// Mutation event types
|
||||
@@ -93,19 +98,21 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
socketPath: socketPath,
|
||||
workspacePath: workspacePath,
|
||||
dbPath: dbPath,
|
||||
storage: store,
|
||||
shutdownChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
startTime: time.Now(),
|
||||
metrics: NewMetrics(),
|
||||
maxConns: maxConns,
|
||||
connSemaphore: make(chan struct{}, maxConns),
|
||||
requestTimeout: requestTimeout,
|
||||
readyChan: make(chan struct{}),
|
||||
mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer
|
||||
socketPath: socketPath,
|
||||
workspacePath: workspacePath,
|
||||
dbPath: dbPath,
|
||||
storage: store,
|
||||
shutdownChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
startTime: time.Now(),
|
||||
metrics: NewMetrics(),
|
||||
maxConns: maxConns,
|
||||
connSemaphore: make(chan struct{}, maxConns),
|
||||
requestTimeout: requestTimeout,
|
||||
readyChan: make(chan struct{}),
|
||||
mutationChan: make(chan MutationEvent, mutationBufferSize), // Configurable buffer
|
||||
recentMutations: make([]MutationEvent, 0, 100),
|
||||
maxMutationBuffer: 100,
|
||||
}
|
||||
s.lastActivityTime.Store(time.Now())
|
||||
return s
|
||||
@@ -113,18 +120,31 @@ func NewServer(socketPath string, store storage.Storage, workspacePath string, d
|
||||
|
||||
// emitMutation sends a mutation event to the daemon's event-driven loop.
|
||||
// Non-blocking: drops event if channel is full (sync will happen eventually).
|
||||
// Also stores in recent mutations buffer for polling.
|
||||
func (s *Server) emitMutation(eventType, issueID string) {
|
||||
select {
|
||||
case s.mutationChan <- MutationEvent{
|
||||
event := MutationEvent{
|
||||
Type: eventType,
|
||||
IssueID: issueID,
|
||||
Timestamp: time.Now(),
|
||||
}:
|
||||
}
|
||||
|
||||
// Send to mutation channel for daemon
|
||||
select {
|
||||
case s.mutationChan <- event:
|
||||
// Event sent successfully
|
||||
default:
|
||||
// Channel full, increment dropped events counter
|
||||
s.droppedEvents.Add(1)
|
||||
}
|
||||
|
||||
// Store in recent mutations buffer for polling
|
||||
s.recentMutationsMu.Lock()
|
||||
s.recentMutations = append(s.recentMutations, event)
|
||||
// Keep buffer size limited (circular buffer behavior)
|
||||
if len(s.recentMutations) > s.maxMutationBuffer {
|
||||
s.recentMutations = s.recentMutations[1:]
|
||||
}
|
||||
s.recentMutationsMu.Unlock()
|
||||
}
|
||||
|
||||
// MutationChan returns the mutation event channel for the daemon to consume
|
||||
@@ -136,3 +156,36 @@ func (s *Server) MutationChan() <-chan MutationEvent {
|
||||
func (s *Server) ResetDroppedEventsCount() int64 {
|
||||
return s.droppedEvents.Swap(0)
|
||||
}
|
||||
|
||||
// GetRecentMutations returns mutations since the given timestamp
|
||||
func (s *Server) GetRecentMutations(sinceMillis int64) []MutationEvent {
|
||||
s.recentMutationsMu.RLock()
|
||||
defer s.recentMutationsMu.RUnlock()
|
||||
|
||||
var result []MutationEvent
|
||||
for _, m := range s.recentMutations {
|
||||
if m.Timestamp.UnixMilli() > sinceMillis {
|
||||
result = append(result, m)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// handleGetMutations handles the get_mutations RPC operation
|
||||
func (s *Server) handleGetMutations(req *Request) Response {
|
||||
var args GetMutationsArgs
|
||||
if err := json.Unmarshal(req.Args, &args); err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("invalid arguments: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
mutations := s.GetRecentMutations(args.Since)
|
||||
data, _ := json.Marshal(mutations)
|
||||
|
||||
return Response{
|
||||
Success: true,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
277
internal/rpc/server_mutations_test.go
Normal file
277
internal/rpc/server_mutations_test.go
Normal file
@@ -0,0 +1,277 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage/memory"
|
||||
)
|
||||
|
||||
func TestEmitMutation(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Emit a mutation
|
||||
server.emitMutation(MutationCreate, "bd-123")
|
||||
|
||||
// Check that mutation was stored in buffer
|
||||
mutations := server.GetRecentMutations(0)
|
||||
if len(mutations) != 1 {
|
||||
t.Fatalf("expected 1 mutation, got %d", len(mutations))
|
||||
}
|
||||
|
||||
if mutations[0].Type != MutationCreate {
|
||||
t.Errorf("expected type %s, got %s", MutationCreate, mutations[0].Type)
|
||||
}
|
||||
|
||||
if mutations[0].IssueID != "bd-123" {
|
||||
t.Errorf("expected issue ID bd-123, got %s", mutations[0].IssueID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRecentMutations_EmptyBuffer(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
mutations := server.GetRecentMutations(0)
|
||||
if len(mutations) != 0 {
|
||||
t.Errorf("expected empty mutations, got %d", len(mutations))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRecentMutations_TimestampFiltering(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Emit mutations with delays
|
||||
server.emitMutation(MutationCreate, "bd-1")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
checkpoint := time.Now().UnixMilli()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
server.emitMutation(MutationUpdate, "bd-2")
|
||||
server.emitMutation(MutationUpdate, "bd-3")
|
||||
|
||||
// Get mutations after checkpoint
|
||||
mutations := server.GetRecentMutations(checkpoint)
|
||||
|
||||
if len(mutations) != 2 {
|
||||
t.Fatalf("expected 2 mutations after checkpoint, got %d", len(mutations))
|
||||
}
|
||||
|
||||
// Verify the mutations are bd-2 and bd-3
|
||||
ids := make(map[string]bool)
|
||||
for _, m := range mutations {
|
||||
ids[m.IssueID] = true
|
||||
}
|
||||
|
||||
if !ids["bd-2"] || !ids["bd-3"] {
|
||||
t.Errorf("expected bd-2 and bd-3, got %v", ids)
|
||||
}
|
||||
|
||||
if ids["bd-1"] {
|
||||
t.Errorf("bd-1 should be filtered out by timestamp")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRecentMutations_CircularBuffer(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Emit more than maxMutationBuffer (100) mutations
|
||||
for i := 0; i < 150; i++ {
|
||||
server.emitMutation(MutationCreate, "bd-"+string(rune(i)))
|
||||
time.Sleep(time.Millisecond) // Ensure different timestamps
|
||||
}
|
||||
|
||||
// Buffer should only keep last 100
|
||||
mutations := server.GetRecentMutations(0)
|
||||
if len(mutations) != 100 {
|
||||
t.Errorf("expected 100 mutations (circular buffer limit), got %d", len(mutations))
|
||||
}
|
||||
|
||||
// First mutation should be from iteration 50 (150-100)
|
||||
firstID := mutations[0].IssueID
|
||||
expectedFirstID := "bd-" + string(rune(50))
|
||||
if firstID != expectedFirstID {
|
||||
t.Errorf("expected first mutation to be %s (after circular buffer wraparound), got %s", expectedFirstID, firstID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRecentMutations_ConcurrentAccess(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Simulate concurrent writes and reads
|
||||
done := make(chan bool)
|
||||
|
||||
// Writer goroutine
|
||||
go func() {
|
||||
for i := 0; i < 50; i++ {
|
||||
server.emitMutation(MutationUpdate, "bd-write")
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
done <- true
|
||||
}()
|
||||
|
||||
// Reader goroutine
|
||||
go func() {
|
||||
for i := 0; i < 50; i++ {
|
||||
_ = server.GetRecentMutations(0)
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
done <- true
|
||||
}()
|
||||
|
||||
// Wait for both to complete
|
||||
<-done
|
||||
<-done
|
||||
|
||||
// Verify no race conditions (test will fail with -race flag if there are)
|
||||
mutations := server.GetRecentMutations(0)
|
||||
if len(mutations) == 0 {
|
||||
t.Error("expected some mutations after concurrent access")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleGetMutations(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Emit some mutations
|
||||
server.emitMutation(MutationCreate, "bd-1")
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
checkpoint := time.Now().UnixMilli()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
server.emitMutation(MutationUpdate, "bd-2")
|
||||
|
||||
// Create RPC request
|
||||
args := GetMutationsArgs{Since: checkpoint}
|
||||
argsJSON, _ := json.Marshal(args)
|
||||
|
||||
req := &Request{
|
||||
Operation: OpGetMutations,
|
||||
Args: argsJSON,
|
||||
}
|
||||
|
||||
// Handle request
|
||||
resp := server.handleGetMutations(req)
|
||||
|
||||
if !resp.Success {
|
||||
t.Fatalf("expected successful response, got error: %s", resp.Error)
|
||||
}
|
||||
|
||||
// Parse response
|
||||
var mutations []MutationEvent
|
||||
if err := json.Unmarshal(resp.Data, &mutations); err != nil {
|
||||
t.Fatalf("failed to unmarshal response: %v", err)
|
||||
}
|
||||
|
||||
if len(mutations) != 1 {
|
||||
t.Errorf("expected 1 mutation, got %d", len(mutations))
|
||||
}
|
||||
|
||||
if len(mutations) > 0 && mutations[0].IssueID != "bd-2" {
|
||||
t.Errorf("expected bd-2, got %s", mutations[0].IssueID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleGetMutations_InvalidArgs(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Create RPC request with invalid JSON
|
||||
req := &Request{
|
||||
Operation: OpGetMutations,
|
||||
Args: []byte("invalid json"),
|
||||
}
|
||||
|
||||
// Handle request
|
||||
resp := server.handleGetMutations(req)
|
||||
|
||||
if resp.Success {
|
||||
t.Error("expected error response for invalid args")
|
||||
}
|
||||
|
||||
if resp.Error == "" {
|
||||
t.Error("expected error message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMutationEventTypes(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Test all mutation types
|
||||
types := []string{
|
||||
MutationCreate,
|
||||
MutationUpdate,
|
||||
MutationDelete,
|
||||
MutationComment,
|
||||
}
|
||||
|
||||
for _, mutationType := range types {
|
||||
server.emitMutation(mutationType, "bd-test")
|
||||
}
|
||||
|
||||
mutations := server.GetRecentMutations(0)
|
||||
if len(mutations) != len(types) {
|
||||
t.Fatalf("expected %d mutations, got %d", len(types), len(mutations))
|
||||
}
|
||||
|
||||
// Verify each type was stored correctly
|
||||
foundTypes := make(map[string]bool)
|
||||
for _, m := range mutations {
|
||||
foundTypes[m.Type] = true
|
||||
}
|
||||
|
||||
for _, expectedType := range types {
|
||||
if !foundTypes[expectedType] {
|
||||
t.Errorf("expected mutation type %s not found", expectedType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMutationTimestamps(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
before := time.Now()
|
||||
server.emitMutation(MutationCreate, "bd-123")
|
||||
after := time.Now()
|
||||
|
||||
mutations := server.GetRecentMutations(0)
|
||||
if len(mutations) != 1 {
|
||||
t.Fatalf("expected 1 mutation, got %d", len(mutations))
|
||||
}
|
||||
|
||||
timestamp := mutations[0].Timestamp
|
||||
if timestamp.Before(before) || timestamp.After(after) {
|
||||
t.Errorf("mutation timestamp %v is outside expected range [%v, %v]", timestamp, before, after)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmitMutation_NonBlocking(t *testing.T) {
|
||||
store := memory.New("/tmp/test.jsonl")
|
||||
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
|
||||
|
||||
// Don't consume from mutationChan to test non-blocking behavior
|
||||
// Fill the buffer (default size is 512 from BEADS_MUTATION_BUFFER or default)
|
||||
for i := 0; i < 600; i++ {
|
||||
// This should not block even when channel is full
|
||||
server.emitMutation(MutationCreate, "bd-test")
|
||||
}
|
||||
|
||||
// Verify mutations were still stored in recent buffer
|
||||
mutations := server.GetRecentMutations(0)
|
||||
if len(mutations) == 0 {
|
||||
t.Error("expected mutations in recent buffer even when channel is full")
|
||||
}
|
||||
|
||||
// Verify buffer is capped at 100 (maxMutationBuffer)
|
||||
if len(mutations) > 100 {
|
||||
t.Errorf("expected at most 100 mutations in buffer, got %d", len(mutations))
|
||||
}
|
||||
}
|
||||
@@ -201,6 +201,8 @@ func (s *Server) handleRequest(req *Request) Response {
|
||||
resp = s.handleImport(req)
|
||||
case OpEpicStatus:
|
||||
resp = s.handleEpicStatus(req)
|
||||
case OpGetMutations:
|
||||
resp = s.handleGetMutations(req)
|
||||
case OpShutdown:
|
||||
resp = s.handleShutdown(req)
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user