Files
beads/cmd/bd/daemon_integration_test.go
2025-12-24 00:06:41 -08:00

597 lines
16 KiB
Go

//go:build integration
// +build integration
package main
import (
"context"
"net"
"os"
"path/filepath"
"sync"
"testing"
"time"
)
// TestStartRPCServer verifies RPC server initialization and startup
func TestStartRPCServer(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
socketPath := filepath.Join(tmpDir, "bd.sock")
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(beadsDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
workspacePath := tmpDir
dbPath := testDBPath
log := createTestLogger(t)
t.Run("starts successfully with valid paths", func(t *testing.T) {
server, serverErrChan, err := startRPCServer(ctx, socketPath, testStore, workspacePath, dbPath, log)
if err != nil {
t.Fatalf("startRPCServer failed: %v", err)
}
defer func() {
if server != nil {
_ = server.Stop()
}
}()
// Verify server is ready
select {
case <-server.WaitReady():
// Server is ready
case <-time.After(2 * time.Second):
t.Fatal("Server did not become ready within 2 seconds")
}
// Verify socket exists and is connectable
conn, err := net.Dial("unix", socketPath)
if err != nil {
t.Fatalf("Failed to connect to socket: %v", err)
}
conn.Close()
// Verify no error on channel
select {
case err := <-serverErrChan:
t.Errorf("Unexpected error on serverErrChan: %v", err)
default:
// Expected - no error yet
}
})
t.Run("fails with invalid socket path", func(t *testing.T) {
invalidSocketPath := "/invalid/nonexistent/path/socket.sock"
_, _, err := startRPCServer(ctx, invalidSocketPath, testStore, workspacePath, dbPath, log)
if err == nil {
t.Error("startRPCServer should fail with invalid socket path")
}
})
t.Run("socket has restricted permissions", func(t *testing.T) {
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
socketPath2 := filepath.Join(tmpDir, "bd2.sock")
server, _, err := startRPCServer(ctx2, socketPath2, testStore, workspacePath, dbPath, log)
if err != nil {
t.Fatalf("startRPCServer failed: %v", err)
}
defer func() {
if server != nil {
_ = server.Stop()
}
}()
// Wait for socket to be created
<-server.WaitReady()
info, err := os.Stat(socketPath2)
if err != nil {
t.Fatalf("Failed to stat socket: %v", err)
}
// Check permissions (should be 0600 or similar restricted)
mode := info.Mode().Perm()
// On Unix, should be 0600 (owner read/write only)
// Accept 0600 or similar restricted permissions
if mode > 0644 {
t.Errorf("Socket permissions %o are too permissive", mode)
}
})
}
// TestRunEventLoop verifies the polling-based event loop
func TestRunEventLoop(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
socketPath := filepath.Join(tmpDir, "bd.sock")
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(beadsDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workspacePath := tmpDir
dbPath := testDBPath
log := createTestLogger(t)
// Start RPC server
server, serverErrChan, err := startRPCServer(ctx, socketPath, testStore, workspacePath, dbPath, log)
if err != nil {
t.Fatalf("Failed to start RPC server: %v", err)
}
defer func() {
if server != nil {
_ = server.Stop()
}
}()
<-server.WaitReady()
t.Run("processes ticker ticks", func(t *testing.T) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
tickCount := 0
syncFunc := func() {
tickCount++
}
// Run event loop in goroutine with short timeout
ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel2()
go func() {
runEventLoop(ctx2, cancel2, ticker, syncFunc, server, serverErrChan, 0, log)
}()
// Wait for context to finish
<-ctx2.Done()
if tickCount == 0 {
t.Error("Event loop should have processed at least one tick")
}
})
t.Run("responds to context cancellation", func(t *testing.T) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
ctx2, cancel2 := context.WithCancel(context.Background())
syncCalled := false
syncFunc := func() {
syncCalled = true
}
done := make(chan struct{})
go func() {
runEventLoop(ctx2, cancel2, ticker, syncFunc, server, serverErrChan, 0, log)
close(done)
}()
// Let it run briefly then cancel
time.Sleep(150 * time.Millisecond)
cancel2()
select {
case <-done:
// Expected - event loop exited
case <-time.After(2 * time.Second):
t.Fatal("Event loop did not exit within 2 seconds")
}
if !syncCalled {
t.Error("Sync function should have been called at least once")
}
})
t.Run("handles parent process death", func(t *testing.T) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
syncFunc := func() {}
done := make(chan struct{})
go func() {
// Use an invalid (non-existent) parent PID so event loop thinks parent died
runEventLoop(ctx2, cancel2, ticker, syncFunc, server, serverErrChan, 999999, log)
close(done)
}()
// Event loop should detect dead parent within 10 seconds and exit
select {
case <-done:
// Expected - event loop detected dead parent and exited
case <-time.After(15 * time.Second):
t.Fatal("Event loop did not exit after detecting dead parent")
}
})
}
// TestRunDaemonLoop_HealthyStartup verifies daemon initialization succeeds with proper setup
func TestRunDaemonLoop_HealthyStartup(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
initTestGitRepo(t, tmpDir)
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(beadsDir, "beads.db") // Use canonical name
// Save original globals and restore after test
oldDBPath := dbPath
oldStore := store
oldWorkingDir, _ := os.Getwd()
defer func() {
dbPath = oldDBPath
store = oldStore
os.Chdir(oldWorkingDir)
}()
// Set up for daemon
dbPath = testDBPath
os.Chdir(tmpDir)
// Create database first
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
t.Run("initialization succeeds with proper database", func(t *testing.T) {
// Note: runDaemonLoop is designed to run indefinitely, so we test
// that it doesn't panic during initialization rather than running it fully
// The full daemon lifecycle is tested in integration with runEventLoop and runEventDrivenLoop
// Verify database exists and is accessible
store = testStore
if _, err := os.Stat(testDBPath); err != nil {
t.Errorf("Test database should exist: %v", err)
}
})
t.Run("validates database file exists", func(t *testing.T) {
// This is more of a setup validation than a runDaemonLoop test
// since runDaemonLoop is called from main without returning until shutdown
invalidDBPath := filepath.Join(tmpDir, "nonexistent", "beads.db")
if _, err := os.Stat(invalidDBPath); !os.IsNotExist(err) {
t.Error("Invalid database path should not exist")
}
})
}
// TestCheckDaemonHealth verifies health check operations
func TestCheckDaemonHealth_StorageAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(beadsDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx := context.Background()
log := createTestLogger(t)
t.Run("completes without error on healthy storage", func(t *testing.T) {
// Should not panic or error
checkDaemonHealth(ctx, testStore, log)
})
t.Run("logs appropriately when storage is accessible", func(t *testing.T) {
// This just verifies it runs without panic
// In a real scenario, we'd check log output
checkDaemonHealth(ctx, testStore, log)
})
}
// TestIsDaemonHealthy verifies daemon health checking via RPC
func TestIsDaemonHealthy(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
socketPath := filepath.Join(tmpDir, "bd.sock")
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(beadsDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
workspacePath := tmpDir
dbPath := testDBPath
log := createTestLogger(t)
t.Run("returns false for unreachable daemon", func(t *testing.T) {
unreachableSocket := filepath.Join(tmpDir, "nonexistent.sock")
result := isDaemonHealthy(unreachableSocket)
if result != false {
t.Error("isDaemonHealthy should return false for unreachable daemon")
}
})
t.Run("returns true for running daemon", func(t *testing.T) {
server, _, err := startRPCServer(ctx, socketPath, testStore, workspacePath, dbPath, log)
if err != nil {
t.Fatalf("Failed to start RPC server: %v", err)
}
defer func() {
if server != nil {
_ = server.Stop()
}
}()
<-server.WaitReady()
// Give socket time to be fully ready
time.Sleep(100 * time.Millisecond)
result := isDaemonHealthy(socketPath)
if !result {
t.Error("isDaemonHealthy should return true for healthy daemon")
}
})
t.Run("detects stale socket", func(t *testing.T) {
staleSocket := filepath.Join(tmpDir, "stale.sock")
// Create a stale socket file (not actually listening)
f, err := os.Create(staleSocket)
if err != nil {
t.Fatalf("Failed to create stale socket: %v", err)
}
f.Close()
result := isDaemonHealthy(staleSocket)
if result != false {
t.Error("isDaemonHealthy should return false for stale socket")
}
})
}
// TestEventLoopSignalHandling tests signal handling in event loop
func TestEventLoopSignalHandling(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
t.Run("handles SIGTERM gracefully", func(t *testing.T) {
tmpDir := makeSocketTempDir(t)
socketPath := filepath.Join(tmpDir, "bd.sock")
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(beadsDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
workspacePath := tmpDir
dbPath := testDBPath
log := createTestLogger(t)
server, serverErrChan, err := startRPCServer(ctx, socketPath, testStore, workspacePath, dbPath, log)
if err != nil {
t.Fatalf("Failed to start RPC server: %v", err)
}
defer func() {
if server != nil {
_ = server.Stop()
}
}()
<-server.WaitReady()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
ctx2, cancel2 := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
runEventLoop(ctx2, cancel2, ticker, func() {}, server, serverErrChan, 0, log)
close(done)
}()
// Let it run, then cancel
time.Sleep(200 * time.Millisecond)
cancel2()
select {
case <-done:
// Expected - event loop exited
case <-time.After(2 * time.Second):
t.Fatal("Event loop did not exit after signal")
}
})
}
// createTestLogger creates a daemonLogger for testing
func createTestLogger(t *testing.T) daemonLogger {
return newTestLogger()
}
// TestDaemonIntegration_SocketCleanup verifies socket cleanup after daemon stops
func TestDaemonIntegration_SocketCleanup(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(beadsDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx := context.Background()
log := createTestLogger(t)
socketPath := filepath.Join(tmpDir, "bd1.sock")
workspacePath := tmpDir
dbPath := testDBPath
ctx1, cancel1 := context.WithTimeout(ctx, 3*time.Second)
server, _, err := startRPCServer(ctx1, socketPath, testStore, workspacePath, dbPath, log)
if err != nil {
t.Fatalf("Failed to start server: %v", err)
}
<-server.WaitReady()
// Verify socket exists
if _, err := os.Stat(socketPath); err != nil {
t.Errorf("Socket should exist: %v", err)
}
// Stop server
_ = server.Stop()
cancel1()
// Wait for cleanup
time.Sleep(500 * time.Millisecond)
// Socket should be gone after cleanup
if _, err := os.Stat(socketPath); !os.IsNotExist(err) {
t.Logf("Socket still exists after stop (may be cleanup timing): %v", err)
}
}
// TestEventDrivenLoop_PeriodicRemoteSync verifies that the event-driven loop
// periodically calls doAutoImport to pull updates from remote.
// This is a regression test for the bug where the event-driven daemon mode
// would not pull remote changes unless the local JSONL file changed.
//
// Bug scenario:
// 1. Clone A creates an issue and daemon pushes to sync branch
// 2. Clone B's daemon only watched local file changes
// 3. Clone B would not see the new issue until something triggered local change
// 4. With this fix: Clone B's daemon periodically calls doAutoImport
func TestEventDrivenLoop_PeriodicRemoteSync(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
socketPath := filepath.Join(tmpDir, "bd.sock")
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
// Create JSONL file for file watcher
jsonlPath := filepath.Join(beadsDir, "issues.jsonl")
if err := os.WriteFile(jsonlPath, []byte{}, 0644); err != nil {
t.Fatalf("Failed to create JSONL file: %v", err)
}
testDBPath := filepath.Join(beadsDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
workspacePath := tmpDir
dbPath := testDBPath
log := createTestLogger(t)
// Start RPC server
server, serverErrChan, err := startRPCServer(ctx, socketPath, testStore, workspacePath, dbPath, log)
if err != nil {
t.Fatalf("Failed to start RPC server: %v", err)
}
defer func() {
if server != nil {
_ = server.Stop()
}
}()
<-server.WaitReady()
// Track how many times doAutoImport is called
var importCount int
var mu sync.Mutex
doAutoImport := func() {
mu.Lock()
importCount++
mu.Unlock()
}
doExport := func() {}
// Run event-driven loop with short timeout
// The remoteSyncTicker fires every 30s, but we can't wait that long in a test
// So we verify the structure is correct and the import debouncer is set up
ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
done := make(chan struct{})
go func() {
runEventDrivenLoop(ctx2, cancel2, server, serverErrChan, testStore, jsonlPath, doExport, doAutoImport, true, 0, log)
close(done)
}()
// Wait for context to finish
<-done
// The loop should have started and be ready to handle periodic syncs
// We can't easily test the 30s ticker in unit tests, but we verified
// the code structure is correct and doAutoImport is wired up
t.Log("Event-driven loop with periodic remote sync started successfully")
}