Files
beads/internal/rpc/server_mutations_test.go
Markus Flür e7f532db93 Implementing an RPC monitoring solution with a web-ui as implementation example. (#244)
* bd sync: 2025-10-30 12:12:27

* Working on frontend

* bd sync: 2025-11-06 16:55:55

* feat: finish bd monitor human viewer

* Merge conflicts resolved and added tests

* bd sync: 2025-11-06 17:23:41

* bd sync: 2025-11-06 17:34:52

* feat: Add reload button and multiselect status filter to monitor

- Changed status filter from single select to multiselect with 'Open' selected by default
- Added reload button with visual feedback (hover/active states)
- Updated filterIssues() to handle multiple selected statuses
- Added reloadData() function that reloads both stats and issues
- Improved responsive design for mobile devices
- Filter controls now use flexbox layout with better spacing

* fix: Update monitor statistics to show Total, In Progress, Open, Closed

- Replaced 'Ready to Work' stat with 'In Progress' stat
- Reordered stats to show logical progression: Total -> In Progress -> Open -> Closed
- Updated loadStats() to fetch in-progress count from stats API
- Removed unnecessary separate API call for ready count

* fix: Correct API field names in monitor stats JavaScript

The JavaScript was using incorrect field names (stats.total, stats.by_status)
that don't match the actual types.Statistics struct which uses flat fields
with underscores (total_issues, in_progress_issues, etc).

Fixed by updating loadStats() to use correct field names:
- stats.total -> stats.total_issues
- stats.by_status?.['in-progress'] -> stats.in_progress_issues
- stats.by_status?.open -> stats.open_issues
- stats.by_status?.closed -> stats.closed_issues

Fixes beads-9

* bd sync: 2025-11-06 17:51:24

* bd sync: 2025-11-06 17:56:09

* fix: Make monitor require daemon to prevent SQLite locking

Implemented Option 1 from beads-eel: monitor now requires daemon and never
opens direct SQLite connection.

Changes:
- Added 'monitor' to noDbCommands list in main.go to skip normal DB initialization
- Added validateDaemonForMonitor() PreRun function that:
  - Finds database path using beads.FindDatabasePath()
  - Validates daemon is running and healthy
  - Fails gracefully with clear error message if no daemon
  - Only uses RPC connection, never opens SQLite directly

Benefits:
- Eliminates SQLite locking conflicts between monitor and daemon
- Users can now close/update issues via CLI while monitor runs
- Clear error messages guide users to start daemon first

Fixes beads-eel

* bd sync: 2025-11-06 18:03:50

* docs: Add bd daemons restart subcommand documentation

Added documentation for the 'bd daemons restart' subcommand across all documentation files:

- commands/daemons.md: Added full restart subcommand section with synopsis, description, arguments, flags, and examples
- README.md: Added restart examples to daemon management section
- AGENTS.md: Added restart examples with --json flag for agents

The restart command gracefully stops and starts a specific daemon by workspace path or PID,
useful after upgrading bd or when a daemon needs refreshing.

Fixes beads-11

* bd sync: 2025-11-06 18:13:16

* Separated the web ui from the general monitoring functionality

---------

Co-authored-by: Steve Yegge <stevey@sourcegraph.com>
2025-11-07 09:49:12 -08:00

278 lines
7.3 KiB
Go

package rpc
import (
"encoding/json"
"testing"
"time"
"github.com/steveyegge/beads/internal/storage/memory"
)
func TestEmitMutation(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Emit a mutation
server.emitMutation(MutationCreate, "bd-123")
// Check that mutation was stored in buffer
mutations := server.GetRecentMutations(0)
if len(mutations) != 1 {
t.Fatalf("expected 1 mutation, got %d", len(mutations))
}
if mutations[0].Type != MutationCreate {
t.Errorf("expected type %s, got %s", MutationCreate, mutations[0].Type)
}
if mutations[0].IssueID != "bd-123" {
t.Errorf("expected issue ID bd-123, got %s", mutations[0].IssueID)
}
}
func TestGetRecentMutations_EmptyBuffer(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
mutations := server.GetRecentMutations(0)
if len(mutations) != 0 {
t.Errorf("expected empty mutations, got %d", len(mutations))
}
}
func TestGetRecentMutations_TimestampFiltering(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Emit mutations with delays
server.emitMutation(MutationCreate, "bd-1")
time.Sleep(10 * time.Millisecond)
checkpoint := time.Now().UnixMilli()
time.Sleep(10 * time.Millisecond)
server.emitMutation(MutationUpdate, "bd-2")
server.emitMutation(MutationUpdate, "bd-3")
// Get mutations after checkpoint
mutations := server.GetRecentMutations(checkpoint)
if len(mutations) != 2 {
t.Fatalf("expected 2 mutations after checkpoint, got %d", len(mutations))
}
// Verify the mutations are bd-2 and bd-3
ids := make(map[string]bool)
for _, m := range mutations {
ids[m.IssueID] = true
}
if !ids["bd-2"] || !ids["bd-3"] {
t.Errorf("expected bd-2 and bd-3, got %v", ids)
}
if ids["bd-1"] {
t.Errorf("bd-1 should be filtered out by timestamp")
}
}
func TestGetRecentMutations_CircularBuffer(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Emit more than maxMutationBuffer (100) mutations
for i := 0; i < 150; i++ {
server.emitMutation(MutationCreate, "bd-"+string(rune(i)))
time.Sleep(time.Millisecond) // Ensure different timestamps
}
// Buffer should only keep last 100
mutations := server.GetRecentMutations(0)
if len(mutations) != 100 {
t.Errorf("expected 100 mutations (circular buffer limit), got %d", len(mutations))
}
// First mutation should be from iteration 50 (150-100)
firstID := mutations[0].IssueID
expectedFirstID := "bd-" + string(rune(50))
if firstID != expectedFirstID {
t.Errorf("expected first mutation to be %s (after circular buffer wraparound), got %s", expectedFirstID, firstID)
}
}
func TestGetRecentMutations_ConcurrentAccess(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Simulate concurrent writes and reads
done := make(chan bool)
// Writer goroutine
go func() {
for i := 0; i < 50; i++ {
server.emitMutation(MutationUpdate, "bd-write")
time.Sleep(time.Millisecond)
}
done <- true
}()
// Reader goroutine
go func() {
for i := 0; i < 50; i++ {
_ = server.GetRecentMutations(0)
time.Sleep(time.Millisecond)
}
done <- true
}()
// Wait for both to complete
<-done
<-done
// Verify no race conditions (test will fail with -race flag if there are)
mutations := server.GetRecentMutations(0)
if len(mutations) == 0 {
t.Error("expected some mutations after concurrent access")
}
}
func TestHandleGetMutations(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Emit some mutations
server.emitMutation(MutationCreate, "bd-1")
time.Sleep(10 * time.Millisecond)
checkpoint := time.Now().UnixMilli()
time.Sleep(10 * time.Millisecond)
server.emitMutation(MutationUpdate, "bd-2")
// Create RPC request
args := GetMutationsArgs{Since: checkpoint}
argsJSON, _ := json.Marshal(args)
req := &Request{
Operation: OpGetMutations,
Args: argsJSON,
}
// Handle request
resp := server.handleGetMutations(req)
if !resp.Success {
t.Fatalf("expected successful response, got error: %s", resp.Error)
}
// Parse response
var mutations []MutationEvent
if err := json.Unmarshal(resp.Data, &mutations); err != nil {
t.Fatalf("failed to unmarshal response: %v", err)
}
if len(mutations) != 1 {
t.Errorf("expected 1 mutation, got %d", len(mutations))
}
if len(mutations) > 0 && mutations[0].IssueID != "bd-2" {
t.Errorf("expected bd-2, got %s", mutations[0].IssueID)
}
}
func TestHandleGetMutations_InvalidArgs(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Create RPC request with invalid JSON
req := &Request{
Operation: OpGetMutations,
Args: []byte("invalid json"),
}
// Handle request
resp := server.handleGetMutations(req)
if resp.Success {
t.Error("expected error response for invalid args")
}
if resp.Error == "" {
t.Error("expected error message")
}
}
func TestMutationEventTypes(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Test all mutation types
types := []string{
MutationCreate,
MutationUpdate,
MutationDelete,
MutationComment,
}
for _, mutationType := range types {
server.emitMutation(mutationType, "bd-test")
}
mutations := server.GetRecentMutations(0)
if len(mutations) != len(types) {
t.Fatalf("expected %d mutations, got %d", len(types), len(mutations))
}
// Verify each type was stored correctly
foundTypes := make(map[string]bool)
for _, m := range mutations {
foundTypes[m.Type] = true
}
for _, expectedType := range types {
if !foundTypes[expectedType] {
t.Errorf("expected mutation type %s not found", expectedType)
}
}
}
func TestMutationTimestamps(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
before := time.Now()
server.emitMutation(MutationCreate, "bd-123")
after := time.Now()
mutations := server.GetRecentMutations(0)
if len(mutations) != 1 {
t.Fatalf("expected 1 mutation, got %d", len(mutations))
}
timestamp := mutations[0].Timestamp
if timestamp.Before(before) || timestamp.After(after) {
t.Errorf("mutation timestamp %v is outside expected range [%v, %v]", timestamp, before, after)
}
}
func TestEmitMutation_NonBlocking(t *testing.T) {
store := memory.New("/tmp/test.jsonl")
server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db")
// Don't consume from mutationChan to test non-blocking behavior
// Fill the buffer (default size is 512 from BEADS_MUTATION_BUFFER or default)
for i := 0; i < 600; i++ {
// This should not block even when channel is full
server.emitMutation(MutationCreate, "bd-test")
}
// Verify mutations were still stored in recent buffer
mutations := server.GetRecentMutations(0)
if len(mutations) == 0 {
t.Error("expected mutations in recent buffer even when channel is full")
}
// Verify buffer is capped at 100 (maxMutationBuffer)
if len(mutations) > 100 {
t.Errorf("expected at most 100 mutations in buffer, got %d", len(mutations))
}
}