Files
beads/cmd/bd/daemon_test.go
Steve Yegge 0f4b03e262 Optimize test suite: split integration tests, add -short support
- Split slow importer integration tests into separate file
- Add t.Short() guards to 10 slow daemon tests
- Document test organization in TEST_OPTIMIZATION.md
- Fast tests now run in ~50s vs 3+ minutes
- Use 'go test -short ./...' for fast feedback

Amp-Thread-ID: https://ampcode.com/threads/T-29ae21ac-749d-43d7-bf0c-2c5f7a06ae76
Co-authored-by: Amp <amp@ampcode.com>
2025-11-05 20:39:47 -08:00

866 lines
21 KiB
Go

//go:build integration
// +build integration
package main
import (
"context"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/steveyegge/beads/internal/config"
"github.com/steveyegge/beads/internal/storage"
"github.com/steveyegge/beads/internal/storage/sqlite"
"github.com/steveyegge/beads/internal/types"
)
const windowsOS = "windows"
func initTestGitRepo(t testing.TB, dir string) {
t.Helper()
cmd := exec.Command("git", "init")
cmd.Dir = dir
if err := cmd.Run(); err != nil {
t.Fatalf("Failed to init git repo: %v", err)
}
// Configure git for tests
configCmds := [][]string{
{"git", "config", "user.email", "test@example.com"},
{"git", "config", "user.name", "Test User"},
}
for _, args := range configCmds {
cmd := exec.Command(args[0], args[1:]...)
cmd.Dir = dir
if err := cmd.Run(); err != nil {
t.Logf("Warning: git config failed: %v", err)
}
}
}
func makeSocketTempDir(t testing.TB) string {
t.Helper()
base := "/tmp"
if runtime.GOOS == windowsOS {
base = os.TempDir()
} else if _, err := os.Stat(base); err != nil {
base = os.TempDir()
}
tmpDir, err := os.MkdirTemp(base, "bd-test-*")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
return tmpDir
}
func TestGetPIDFilePath(t *testing.T) {
tmpDir := t.TempDir()
oldDBPath := dbPath
defer func() { dbPath = oldDBPath }()
dbPath = filepath.Join(tmpDir, ".beads", "test.db")
pidFile, err := getPIDFilePath(false) // test local daemon
if err != nil {
t.Fatalf("getPIDFilePath failed: %v", err)
}
expected := filepath.Join(tmpDir, ".beads", "daemon.pid")
if pidFile != expected {
t.Errorf("Expected PID file %s, got %s", expected, pidFile)
}
if _, err := os.Stat(filepath.Dir(pidFile)); os.IsNotExist(err) {
t.Error("Expected beads directory to be created")
}
}
func TestGetLogFilePath(t *testing.T) {
tests := []struct {
name string
set func(t *testing.T) (userPath, dbFile, expected string)
}{
{
name: "user specified path",
set: func(t *testing.T) (string, string, string) {
userDir := t.TempDir()
dbDir := t.TempDir()
userPath := filepath.Join(userDir, "bd.log")
dbFile := filepath.Join(dbDir, ".beads", "test.db")
return userPath, dbFile, userPath
},
},
{
name: "default with dbPath",
set: func(t *testing.T) (string, string, string) {
dbDir := t.TempDir()
dbFile := filepath.Join(dbDir, ".beads", "test.db")
return "", dbFile, filepath.Join(dbDir, ".beads", "daemon.log")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
userPath, dbFile, expected := tt.set(t)
oldDBPath := dbPath
defer func() { dbPath = oldDBPath }()
dbPath = dbFile
result, err := getLogFilePath(userPath, false) // test local daemon
if err != nil {
t.Fatalf("getLogFilePath failed: %v", err)
}
if result != expected {
t.Errorf("Expected %s, got %s", expected, result)
}
})
}
}
func TestIsDaemonRunning_NotRunning(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
pidFile := filepath.Join(tmpDir, "test.pid")
isRunning, pid := isDaemonRunning(pidFile)
if isRunning {
t.Errorf("Expected daemon not running, got running with PID %d", pid)
}
}
func TestIsDaemonRunning_StalePIDFile(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
pidFile := filepath.Join(tmpDir, "test.pid")
if err := os.WriteFile(pidFile, []byte("99999"), 0600); err != nil {
t.Fatalf("Failed to write PID file: %v", err)
}
isRunning, pid := isDaemonRunning(pidFile)
if isRunning {
t.Errorf("Expected daemon not running (stale PID), got running with PID %d", pid)
}
}
func TestIsDaemonRunning_CurrentProcess(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
pidFile := filepath.Join(tmpDir, "test.pid")
// Acquire the daemon lock to simulate a running daemon
beadsDir := filepath.Dir(pidFile)
dbPath := filepath.Join(beadsDir, "beads.db")
lock, err := acquireDaemonLock(beadsDir, dbPath)
if err != nil {
t.Fatalf("Failed to acquire daemon lock: %v", err)
}
defer lock.Close()
currentPID := os.Getpid()
isRunning, pid := isDaemonRunning(pidFile)
if !isRunning {
t.Error("Expected daemon running (lock held)")
}
if pid != currentPID {
t.Errorf("Expected PID %d, got %d", currentPID, pid)
}
}
func TestDaemonIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := t.TempDir()
dbDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(dbDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(dbDir, "test.db")
testStore := newTestStore(t, testDBPath)
oldStore := store
oldDBPath := dbPath
defer func() {
testStore.Close()
store = oldStore
dbPath = oldDBPath
}()
store = testStore
dbPath = testDBPath
ctx := context.Background()
testIssue := &types.Issue{
Title: "Test daemon issue",
Description: "Test description",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := testStore.CreateIssue(ctx, testIssue, "test"); err != nil {
t.Fatalf("Failed to create test issue: %v", err)
}
pidFile := filepath.Join(dbDir, "daemon.pid")
_ = pidFile
if isRunning, _ := isDaemonRunning(pidFile); isRunning {
t.Fatal("Daemon should not be running at start of test")
}
t.Run("start requires git repo", func(t *testing.T) {
if isGitRepo() {
t.Skip("Already in a git repo, skipping this test")
}
})
t.Run("status shows not running", func(t *testing.T) {
if isRunning, _ := isDaemonRunning(pidFile); isRunning {
t.Error("Daemon should not be running")
}
})
}
func TestDaemonPIDFileManagement(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
pidFile := filepath.Join(tmpDir, "daemon.pid")
testPID := 12345
if err := os.WriteFile(pidFile, []byte(strconv.Itoa(testPID)), 0600); err != nil {
t.Fatalf("Failed to write PID file: %v", err)
}
data, err := os.ReadFile(pidFile)
if err != nil {
t.Fatalf("Failed to read PID file: %v", err)
}
readPID, err := strconv.Atoi(strings.TrimSpace(string(data)))
if err != nil {
t.Fatalf("Failed to parse PID: %v", err)
}
if readPID != testPID {
t.Errorf("Expected PID %d, got %d", testPID, readPID)
}
if err := os.Remove(pidFile); err != nil {
t.Fatalf("Failed to remove PID file: %v", err)
}
if _, err := os.Stat(pidFile); !os.IsNotExist(err) {
t.Error("PID file should be removed")
}
}
func TestDaemonLogFileCreation(t *testing.T) {
t.Parallel()
tmpDir := t.TempDir()
logPath := filepath.Join(tmpDir, "test.log")
logF, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
t.Fatalf("Failed to open log file: %v", err)
}
defer logF.Close()
timestamp := time.Now().Format("2006-01-02 15:04:05")
msg := "Test log message"
_, err = fmt.Fprintf(logF, "[%s] %s\n", timestamp, msg)
if err != nil {
t.Fatalf("Failed to write to log file: %v", err)
}
logF.Sync()
content, err := os.ReadFile(logPath)
if err != nil {
t.Fatalf("Failed to read log file: %v", err)
}
if !strings.Contains(string(content), msg) {
t.Errorf("Log file should contain message: %s", msg)
}
}
func TestDaemonIntervalParsing(t *testing.T) {
t.Parallel()
tests := []struct {
input string
expected time.Duration
}{
{"5m", 5 * time.Minute},
{"1h", 1 * time.Hour},
{"30s", 30 * time.Second},
{"2m30s", 2*time.Minute + 30*time.Second},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
t.Parallel()
d, err := time.ParseDuration(tt.input)
if err != nil {
t.Errorf("Failed to parse duration %s: %v", tt.input, err)
}
if d != tt.expected {
t.Errorf("Expected %v, got %v", tt.expected, d)
}
})
}
}
func TestDaemonRPCServerIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := t.TempDir()
dbDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(dbDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(dbDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testIssue := &types.Issue{
Title: "Test RPC issue",
Description: "Test RPC integration",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := testStore.CreateIssue(ctx, testIssue, "test"); err != nil {
t.Fatalf("Failed to create test issue: %v", err)
}
if testIssue.ID == "" {
t.Fatal("Issue ID should be set after creation")
}
}
func TestDaemonConcurrentOperations(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := t.TempDir()
dbDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(dbDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(dbDir, "test.db")
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
ctx := context.Background()
numGoroutines := 10
errChan := make(chan error, numGoroutines)
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
issue := &types.Issue{
Title: fmt.Sprintf("Concurrent issue %d", n),
Description: "Test concurrent operations",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := testStore.CreateIssue(ctx, issue, "test"); err != nil {
errChan <- fmt.Errorf("goroutine %d create failed: %w", n, err)
return
}
updates := map[string]interface{}{
"status": types.StatusInProgress,
}
if err := testStore.UpdateIssue(ctx, issue.ID, updates, "test"); err != nil {
errChan <- fmt.Errorf("goroutine %d update failed: %w", n, err)
return
}
}(i)
}
wg.Wait()
close(errChan)
for err := range errChan {
t.Error(err)
}
issues, err := testStore.SearchIssues(ctx, "", types.IssueFilter{})
if err != nil {
t.Fatalf("Failed to search issues: %v", err)
}
if len(issues) != numGoroutines {
t.Errorf("Expected %d issues, got %d", numGoroutines, len(issues))
}
}
func TestDaemonSocketCleanupOnShutdown(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
defer os.RemoveAll(tmpDir)
socketPath := filepath.Join(tmpDir, "test.sock")
testDBPath := filepath.Join(tmpDir, "test.db")
testStore := newTestStore(t, testDBPath)
server := newMockDaemonServer(socketPath, testStore)
ctx, cancel := context.WithCancel(context.Background())
serverDone := make(chan error, 1)
go func() {
serverDone <- server.Start(ctx)
}()
// Wait for server to be ready
if err := server.WaitReady(2 * time.Second); err != nil {
t.Fatal(err)
}
// Verify socket exists (with retry for filesystem sync)
var socketFound bool
var lastErr error
for i := 0; i < 10; i++ {
if _, err := os.Stat(socketPath); err == nil {
socketFound = true
break
} else {
lastErr = err
}
time.Sleep(10 * time.Millisecond)
}
if !socketFound {
t.Fatalf("Socket should exist after server is ready (path=%s, err=%v)", socketPath, lastErr)
}
cancel()
select {
case <-serverDone:
case <-time.After(2 * time.Second):
t.Error("Server did not shut down within timeout")
}
testStore.Close()
time.Sleep(100 * time.Millisecond)
if _, err := os.Stat(socketPath); !os.IsNotExist(err) {
t.Error("Socket should be cleaned up after shutdown")
}
}
func TestDaemonServerStartFailureSocketExists(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
defer os.RemoveAll(tmpDir)
socketPath := filepath.Join(tmpDir, "test.sock")
testDBPath := filepath.Join(tmpDir, "test.db")
testStore1, err := sqlite.New(testDBPath)
if err != nil {
t.Fatalf("Failed to create test database: %v", err)
}
defer testStore1.Close()
server1 := newMockDaemonServer(socketPath, testStore1)
ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
go server1.Start(ctx1)
// Wait for server to be ready
if err := server1.WaitReady(2 * time.Second); err != nil {
t.Fatal(err)
}
// Verify socket exists (with retry for filesystem sync)
var socketFound bool
for i := 0; i < 10; i++ {
if _, err := os.Stat(socketPath); err == nil {
socketFound = true
break
}
time.Sleep(10 * time.Millisecond)
}
if !socketFound {
t.Fatal("Socket should exist for first server")
}
testStore2, err := sqlite.New(filepath.Join(tmpDir, "test2.db"))
if err != nil {
t.Fatalf("Failed to create second test database: %v", err)
}
defer testStore2.Close()
server2 := newMockDaemonServer(socketPath, testStore2)
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel2()
startErr := make(chan error, 1)
go func() {
startErr <- server2.Start(ctx2)
}()
select {
case err := <-startErr:
if err == nil {
t.Error("Expected second server to fail to start, but it succeeded")
}
case <-time.After(1 * time.Second):
}
cancel1()
time.Sleep(200 * time.Millisecond)
}
func TestDaemonGracefulShutdown(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
tmpDir := makeSocketTempDir(t)
defer os.RemoveAll(tmpDir)
socketPath := filepath.Join(tmpDir, "test.sock")
testDBPath := filepath.Join(tmpDir, "test.db")
testStore := newTestStore(t, testDBPath)
server := newMockDaemonServer(socketPath, testStore)
ctx, cancel := context.WithCancel(context.Background())
serverDone := make(chan error, 1)
startTime := time.Now()
go func() {
serverDone <- server.Start(ctx)
}()
time.Sleep(200 * time.Millisecond)
cancel()
select {
case err := <-serverDone:
shutdownDuration := time.Since(startTime)
if err != nil && err != context.Canceled {
t.Errorf("Server returned unexpected error: %v", err)
}
if shutdownDuration > 3*time.Second {
t.Errorf("Shutdown took too long: %v", shutdownDuration)
}
testStore.Close()
if _, err := os.Stat(socketPath); !os.IsNotExist(err) {
t.Error("Socket should be cleaned up after graceful shutdown")
}
case <-time.After(5 * time.Second):
t.Fatal("Server did not shut down gracefully within timeout")
}
}
type mockDaemonServer struct {
socketPath string
store storage.Storage
listener net.Listener
mu sync.Mutex
shutdown bool
ready chan error
}
func newMockDaemonServer(socketPath string, store storage.Storage) *mockDaemonServer {
return &mockDaemonServer{
socketPath: socketPath,
store: store,
ready: make(chan error, 1),
}
}
func (s *mockDaemonServer) WaitReady(timeout time.Duration) error {
select {
case err := <-s.ready:
return err
case <-time.After(timeout):
return fmt.Errorf("server did not become ready within %v", timeout)
}
}
func (s *mockDaemonServer) Start(ctx context.Context) error {
if err := os.MkdirAll(filepath.Dir(s.socketPath), 0750); err != nil {
return fmt.Errorf("failed to create socket directory: %w", err)
}
// Check if socket already exists
if _, err := os.Stat(s.socketPath); err == nil {
// Socket exists - try to connect to see if server is running
conn, err := net.Dial("unix", s.socketPath)
if err == nil {
conn.Close()
startErr := fmt.Errorf("socket already in use: %s", s.socketPath)
s.ready <- startErr
return startErr
}
// Socket is stale, remove it
_ = os.Remove(s.socketPath)
}
var err error
s.listener, err = net.Listen("unix", s.socketPath)
if err != nil {
startErr := fmt.Errorf("failed to listen on socket: %w", err)
s.ready <- startErr
return startErr
}
// Signal that server is ready
s.ready <- nil
// Set up cleanup before accepting connections
defer func() {
s.listener.Close()
os.Remove(s.socketPath)
}()
doneChan := make(chan struct{})
go func() {
<-ctx.Done()
s.mu.Lock()
s.shutdown = true
s.mu.Unlock()
s.listener.Close()
close(doneChan)
}()
for {
conn, err := s.listener.Accept()
if err != nil {
s.mu.Lock()
shutdown := s.shutdown
s.mu.Unlock()
if shutdown {
<-doneChan
return ctx.Err()
}
return fmt.Errorf("failed to accept connection: %w", err)
}
conn.Close()
}
}
// TestMutationToExportLatency tests the latency from mutation to JSONL export
// Target: <500ms for single mutation, verify batching for rapid mutations
//
// NOTE: This test currently tests the existing auto-flush mechanism with debounce.
// Once bd-85 (event-driven daemon) is fully implemented and enabled by default,
// this test should verify <500ms latency instead of the current debounce-based timing.
func TestMutationToExportLatency(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
t.Skip("Skipping until event-driven daemon (bd-85) is fully implemented")
tmpDir := t.TempDir()
dbDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(dbDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
testDBPath := filepath.Join(dbDir, "test.db")
jsonlPath := filepath.Join(dbDir, "issues.jsonl")
// Initialize git repo (required for auto-flush)
initTestGitRepo(t, tmpDir)
testStore := newTestStore(t, testDBPath)
defer testStore.Close()
// Configure test environment - set global store
oldDBPath := dbPath
oldStore := store
oldStoreActive := storeActive
oldAutoFlush := autoFlushEnabled
origDebounce := config.GetDuration("flush-debounce")
defer func() {
dbPath = oldDBPath
store = oldStore
storeMutex.Lock()
storeActive = oldStoreActive
storeMutex.Unlock()
autoFlushEnabled = oldAutoFlush
config.Set("flush-debounce", origDebounce)
clearAutoFlushState()
}()
dbPath = testDBPath
store = testStore
storeMutex.Lock()
storeActive = true
storeMutex.Unlock()
autoFlushEnabled = true
// Use fast debounce for testing (500ms to match event-driven target)
config.Set("flush-debounce", 500*time.Millisecond)
ctx := context.Background()
// Get JSONL mod time
getModTime := func() time.Time {
info, err := os.Stat(jsonlPath)
if err != nil {
return time.Time{}
}
return info.ModTime()
}
// Test 1: Single mutation latency with markDirtyAndScheduleFlush
t.Run("SingleMutationLatency", func(t *testing.T) {
initialModTime := getModTime()
// Create issue through store
issue := &types.Issue{
Title: "Latency test issue",
Description: "Testing export latency",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
start := time.Now()
if err := testStore.CreateIssue(ctx, issue, "test"); err != nil {
t.Fatalf("Failed to create issue: %v", err)
}
// Manually trigger flush (simulating what CLI commands do)
markDirtyAndScheduleFlush()
// Wait for JSONL file to be updated (with timeout)
timeout := time.After(2 * time.Second) // 500ms debounce + margin
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
var updated bool
var latency time.Duration
for !updated {
select {
case <-ticker.C:
modTime := getModTime()
if modTime.After(initialModTime) {
latency = time.Since(start)
updated = true
}
case <-timeout:
t.Fatal("JSONL file not updated within 2 seconds")
}
}
t.Logf("Single mutation export latency: %v", latency)
// Verify <1s latency (500ms debounce + export time)
if latency > 1*time.Second {
t.Errorf("Latency %v exceeds 1s threshold", latency)
}
})
// Test 2: Rapid mutations should batch
t.Run("RapidMutationBatching", func(t *testing.T) {
preTestModTime := getModTime()
// Create 5 issues rapidly
numIssues := 5
start := time.Now()
for i := 0; i < numIssues; i++ {
issue := &types.Issue{
Title: fmt.Sprintf("Batch test issue %d", i),
Description: "Testing batching",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
}
if err := testStore.CreateIssue(ctx, issue, "test"); err != nil {
t.Fatalf("Failed to create issue %d: %v", i, err)
}
// Trigger flush for each
markDirtyAndScheduleFlush()
// Small delay to ensure they're separate operations
time.Sleep(100 * time.Millisecond)
}
creationDuration := time.Since(start)
t.Logf("Created %d issues in %v", numIssues, creationDuration)
// Wait for JSONL update
timeout := time.After(2 * time.Second) // 500ms debounce + margin
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
var updated bool
for !updated {
select {
case <-ticker.C:
modTime := getModTime()
if modTime.After(preTestModTime) {
updated = true
}
case <-timeout:
t.Fatal("JSONL file not updated within 2 seconds")
}
}
totalLatency := time.Since(start)
t.Logf("All mutations exported in %v", totalLatency)
// Verify batching: rapid calls to markDirty within debounce window
// should result in single flush after ~500ms
if totalLatency > 2*time.Second {
t.Errorf("Batching failed: total latency %v exceeds 2s", totalLatency)
}
})
}