- Split slow importer integration tests into separate file - Add t.Short() guards to 10 slow daemon tests - Document test organization in TEST_OPTIMIZATION.md - Fast tests now run in ~50s vs 3+ minutes - Use 'go test -short ./...' for fast feedback Amp-Thread-ID: https://ampcode.com/threads/T-29ae21ac-749d-43d7-bf0c-2c5f7a06ae76 Co-authored-by: Amp <amp@ampcode.com>
866 lines
21 KiB
Go
866 lines
21 KiB
Go
//go:build integration
|
|
// +build integration
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/steveyegge/beads/internal/config"
|
|
"github.com/steveyegge/beads/internal/storage"
|
|
"github.com/steveyegge/beads/internal/storage/sqlite"
|
|
"github.com/steveyegge/beads/internal/types"
|
|
)
|
|
|
|
const windowsOS = "windows"
|
|
|
|
func initTestGitRepo(t testing.TB, dir string) {
|
|
t.Helper()
|
|
cmd := exec.Command("git", "init")
|
|
cmd.Dir = dir
|
|
if err := cmd.Run(); err != nil {
|
|
t.Fatalf("Failed to init git repo: %v", err)
|
|
}
|
|
|
|
// Configure git for tests
|
|
configCmds := [][]string{
|
|
{"git", "config", "user.email", "test@example.com"},
|
|
{"git", "config", "user.name", "Test User"},
|
|
}
|
|
for _, args := range configCmds {
|
|
cmd := exec.Command(args[0], args[1:]...)
|
|
cmd.Dir = dir
|
|
if err := cmd.Run(); err != nil {
|
|
t.Logf("Warning: git config failed: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func makeSocketTempDir(t testing.TB) string {
|
|
t.Helper()
|
|
|
|
base := "/tmp"
|
|
if runtime.GOOS == windowsOS {
|
|
base = os.TempDir()
|
|
} else if _, err := os.Stat(base); err != nil {
|
|
base = os.TempDir()
|
|
}
|
|
|
|
tmpDir, err := os.MkdirTemp(base, "bd-test-*")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create temp dir: %v", err)
|
|
}
|
|
return tmpDir
|
|
}
|
|
|
|
func TestGetPIDFilePath(t *testing.T) {
|
|
tmpDir := t.TempDir()
|
|
oldDBPath := dbPath
|
|
defer func() { dbPath = oldDBPath }()
|
|
|
|
dbPath = filepath.Join(tmpDir, ".beads", "test.db")
|
|
pidFile, err := getPIDFilePath(false) // test local daemon
|
|
if err != nil {
|
|
t.Fatalf("getPIDFilePath failed: %v", err)
|
|
}
|
|
|
|
expected := filepath.Join(tmpDir, ".beads", "daemon.pid")
|
|
if pidFile != expected {
|
|
t.Errorf("Expected PID file %s, got %s", expected, pidFile)
|
|
}
|
|
|
|
if _, err := os.Stat(filepath.Dir(pidFile)); os.IsNotExist(err) {
|
|
t.Error("Expected beads directory to be created")
|
|
}
|
|
}
|
|
|
|
func TestGetLogFilePath(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
set func(t *testing.T) (userPath, dbFile, expected string)
|
|
}{
|
|
{
|
|
name: "user specified path",
|
|
set: func(t *testing.T) (string, string, string) {
|
|
userDir := t.TempDir()
|
|
dbDir := t.TempDir()
|
|
userPath := filepath.Join(userDir, "bd.log")
|
|
dbFile := filepath.Join(dbDir, ".beads", "test.db")
|
|
return userPath, dbFile, userPath
|
|
},
|
|
},
|
|
{
|
|
name: "default with dbPath",
|
|
set: func(t *testing.T) (string, string, string) {
|
|
dbDir := t.TempDir()
|
|
dbFile := filepath.Join(dbDir, ".beads", "test.db")
|
|
return "", dbFile, filepath.Join(dbDir, ".beads", "daemon.log")
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
userPath, dbFile, expected := tt.set(t)
|
|
|
|
oldDBPath := dbPath
|
|
defer func() { dbPath = oldDBPath }()
|
|
dbPath = dbFile
|
|
|
|
result, err := getLogFilePath(userPath, false) // test local daemon
|
|
if err != nil {
|
|
t.Fatalf("getLogFilePath failed: %v", err)
|
|
}
|
|
if result != expected {
|
|
t.Errorf("Expected %s, got %s", expected, result)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestIsDaemonRunning_NotRunning(t *testing.T) {
|
|
t.Parallel()
|
|
tmpDir := t.TempDir()
|
|
pidFile := filepath.Join(tmpDir, "test.pid")
|
|
|
|
isRunning, pid := isDaemonRunning(pidFile)
|
|
if isRunning {
|
|
t.Errorf("Expected daemon not running, got running with PID %d", pid)
|
|
}
|
|
}
|
|
|
|
func TestIsDaemonRunning_StalePIDFile(t *testing.T) {
|
|
t.Parallel()
|
|
tmpDir := t.TempDir()
|
|
pidFile := filepath.Join(tmpDir, "test.pid")
|
|
|
|
if err := os.WriteFile(pidFile, []byte("99999"), 0600); err != nil {
|
|
t.Fatalf("Failed to write PID file: %v", err)
|
|
}
|
|
|
|
isRunning, pid := isDaemonRunning(pidFile)
|
|
if isRunning {
|
|
t.Errorf("Expected daemon not running (stale PID), got running with PID %d", pid)
|
|
}
|
|
}
|
|
|
|
func TestIsDaemonRunning_CurrentProcess(t *testing.T) {
|
|
t.Parallel()
|
|
tmpDir := t.TempDir()
|
|
pidFile := filepath.Join(tmpDir, "test.pid")
|
|
|
|
// Acquire the daemon lock to simulate a running daemon
|
|
beadsDir := filepath.Dir(pidFile)
|
|
dbPath := filepath.Join(beadsDir, "beads.db")
|
|
lock, err := acquireDaemonLock(beadsDir, dbPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to acquire daemon lock: %v", err)
|
|
}
|
|
defer lock.Close()
|
|
|
|
currentPID := os.Getpid()
|
|
isRunning, pid := isDaemonRunning(pidFile)
|
|
if !isRunning {
|
|
t.Error("Expected daemon running (lock held)")
|
|
}
|
|
if pid != currentPID {
|
|
t.Errorf("Expected PID %d, got %d", currentPID, pid)
|
|
}
|
|
}
|
|
|
|
func TestDaemonIntegration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
tmpDir := t.TempDir()
|
|
dbDir := filepath.Join(tmpDir, ".beads")
|
|
if err := os.MkdirAll(dbDir, 0755); err != nil {
|
|
t.Fatalf("Failed to create beads dir: %v", err)
|
|
}
|
|
|
|
testDBPath := filepath.Join(dbDir, "test.db")
|
|
testStore := newTestStore(t, testDBPath)
|
|
|
|
oldStore := store
|
|
oldDBPath := dbPath
|
|
defer func() {
|
|
testStore.Close()
|
|
store = oldStore
|
|
dbPath = oldDBPath
|
|
}()
|
|
store = testStore
|
|
dbPath = testDBPath
|
|
|
|
ctx := context.Background()
|
|
testIssue := &types.Issue{
|
|
Title: "Test daemon issue",
|
|
Description: "Test description",
|
|
Status: types.StatusOpen,
|
|
Priority: 1,
|
|
IssueType: types.TypeTask,
|
|
}
|
|
if err := testStore.CreateIssue(ctx, testIssue, "test"); err != nil {
|
|
t.Fatalf("Failed to create test issue: %v", err)
|
|
}
|
|
|
|
pidFile := filepath.Join(dbDir, "daemon.pid")
|
|
_ = pidFile
|
|
|
|
if isRunning, _ := isDaemonRunning(pidFile); isRunning {
|
|
t.Fatal("Daemon should not be running at start of test")
|
|
}
|
|
|
|
t.Run("start requires git repo", func(t *testing.T) {
|
|
if isGitRepo() {
|
|
t.Skip("Already in a git repo, skipping this test")
|
|
}
|
|
})
|
|
|
|
t.Run("status shows not running", func(t *testing.T) {
|
|
if isRunning, _ := isDaemonRunning(pidFile); isRunning {
|
|
t.Error("Daemon should not be running")
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestDaemonPIDFileManagement(t *testing.T) {
|
|
t.Parallel()
|
|
tmpDir := t.TempDir()
|
|
pidFile := filepath.Join(tmpDir, "daemon.pid")
|
|
|
|
testPID := 12345
|
|
if err := os.WriteFile(pidFile, []byte(strconv.Itoa(testPID)), 0600); err != nil {
|
|
t.Fatalf("Failed to write PID file: %v", err)
|
|
}
|
|
|
|
data, err := os.ReadFile(pidFile)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read PID file: %v", err)
|
|
}
|
|
|
|
readPID, err := strconv.Atoi(strings.TrimSpace(string(data)))
|
|
if err != nil {
|
|
t.Fatalf("Failed to parse PID: %v", err)
|
|
}
|
|
|
|
if readPID != testPID {
|
|
t.Errorf("Expected PID %d, got %d", testPID, readPID)
|
|
}
|
|
|
|
if err := os.Remove(pidFile); err != nil {
|
|
t.Fatalf("Failed to remove PID file: %v", err)
|
|
}
|
|
|
|
if _, err := os.Stat(pidFile); !os.IsNotExist(err) {
|
|
t.Error("PID file should be removed")
|
|
}
|
|
}
|
|
|
|
func TestDaemonLogFileCreation(t *testing.T) {
|
|
t.Parallel()
|
|
tmpDir := t.TempDir()
|
|
logPath := filepath.Join(tmpDir, "test.log")
|
|
|
|
logF, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
if err != nil {
|
|
t.Fatalf("Failed to open log file: %v", err)
|
|
}
|
|
defer logF.Close()
|
|
|
|
timestamp := time.Now().Format("2006-01-02 15:04:05")
|
|
msg := "Test log message"
|
|
_, err = fmt.Fprintf(logF, "[%s] %s\n", timestamp, msg)
|
|
if err != nil {
|
|
t.Fatalf("Failed to write to log file: %v", err)
|
|
}
|
|
|
|
logF.Sync()
|
|
|
|
content, err := os.ReadFile(logPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to read log file: %v", err)
|
|
}
|
|
|
|
if !strings.Contains(string(content), msg) {
|
|
t.Errorf("Log file should contain message: %s", msg)
|
|
}
|
|
}
|
|
|
|
func TestDaemonIntervalParsing(t *testing.T) {
|
|
t.Parallel()
|
|
tests := []struct {
|
|
input string
|
|
expected time.Duration
|
|
}{
|
|
{"5m", 5 * time.Minute},
|
|
{"1h", 1 * time.Hour},
|
|
{"30s", 30 * time.Second},
|
|
{"2m30s", 2*time.Minute + 30*time.Second},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.input, func(t *testing.T) {
|
|
t.Parallel()
|
|
d, err := time.ParseDuration(tt.input)
|
|
if err != nil {
|
|
t.Errorf("Failed to parse duration %s: %v", tt.input, err)
|
|
}
|
|
if d != tt.expected {
|
|
t.Errorf("Expected %v, got %v", tt.expected, d)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestDaemonRPCServerIntegration(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
tmpDir := t.TempDir()
|
|
dbDir := filepath.Join(tmpDir, ".beads")
|
|
if err := os.MkdirAll(dbDir, 0755); err != nil {
|
|
t.Fatalf("Failed to create beads dir: %v", err)
|
|
}
|
|
|
|
testDBPath := filepath.Join(dbDir, "test.db")
|
|
testStore := newTestStore(t, testDBPath)
|
|
defer testStore.Close()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
testIssue := &types.Issue{
|
|
Title: "Test RPC issue",
|
|
Description: "Test RPC integration",
|
|
Status: types.StatusOpen,
|
|
Priority: 1,
|
|
IssueType: types.TypeTask,
|
|
}
|
|
if err := testStore.CreateIssue(ctx, testIssue, "test"); err != nil {
|
|
t.Fatalf("Failed to create test issue: %v", err)
|
|
}
|
|
|
|
if testIssue.ID == "" {
|
|
t.Fatal("Issue ID should be set after creation")
|
|
}
|
|
}
|
|
|
|
func TestDaemonConcurrentOperations(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
tmpDir := t.TempDir()
|
|
dbDir := filepath.Join(tmpDir, ".beads")
|
|
if err := os.MkdirAll(dbDir, 0755); err != nil {
|
|
t.Fatalf("Failed to create beads dir: %v", err)
|
|
}
|
|
|
|
testDBPath := filepath.Join(dbDir, "test.db")
|
|
testStore := newTestStore(t, testDBPath)
|
|
defer testStore.Close()
|
|
|
|
ctx := context.Background()
|
|
|
|
numGoroutines := 10
|
|
errChan := make(chan error, numGoroutines)
|
|
var wg sync.WaitGroup
|
|
|
|
for i := 0; i < numGoroutines; i++ {
|
|
wg.Add(1)
|
|
go func(n int) {
|
|
defer wg.Done()
|
|
|
|
issue := &types.Issue{
|
|
Title: fmt.Sprintf("Concurrent issue %d", n),
|
|
Description: "Test concurrent operations",
|
|
Status: types.StatusOpen,
|
|
Priority: 1,
|
|
IssueType: types.TypeTask,
|
|
}
|
|
|
|
if err := testStore.CreateIssue(ctx, issue, "test"); err != nil {
|
|
errChan <- fmt.Errorf("goroutine %d create failed: %w", n, err)
|
|
return
|
|
}
|
|
|
|
updates := map[string]interface{}{
|
|
"status": types.StatusInProgress,
|
|
}
|
|
if err := testStore.UpdateIssue(ctx, issue.ID, updates, "test"); err != nil {
|
|
errChan <- fmt.Errorf("goroutine %d update failed: %w", n, err)
|
|
return
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errChan)
|
|
|
|
for err := range errChan {
|
|
t.Error(err)
|
|
}
|
|
|
|
issues, err := testStore.SearchIssues(ctx, "", types.IssueFilter{})
|
|
if err != nil {
|
|
t.Fatalf("Failed to search issues: %v", err)
|
|
}
|
|
|
|
if len(issues) != numGoroutines {
|
|
t.Errorf("Expected %d issues, got %d", numGoroutines, len(issues))
|
|
}
|
|
}
|
|
|
|
func TestDaemonSocketCleanupOnShutdown(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
tmpDir := makeSocketTempDir(t)
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
socketPath := filepath.Join(tmpDir, "test.sock")
|
|
testDBPath := filepath.Join(tmpDir, "test.db")
|
|
|
|
testStore := newTestStore(t, testDBPath)
|
|
|
|
server := newMockDaemonServer(socketPath, testStore)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
serverDone := make(chan error, 1)
|
|
go func() {
|
|
serverDone <- server.Start(ctx)
|
|
}()
|
|
|
|
// Wait for server to be ready
|
|
if err := server.WaitReady(2 * time.Second); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Verify socket exists (with retry for filesystem sync)
|
|
var socketFound bool
|
|
var lastErr error
|
|
for i := 0; i < 10; i++ {
|
|
if _, err := os.Stat(socketPath); err == nil {
|
|
socketFound = true
|
|
break
|
|
} else {
|
|
lastErr = err
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
if !socketFound {
|
|
t.Fatalf("Socket should exist after server is ready (path=%s, err=%v)", socketPath, lastErr)
|
|
}
|
|
|
|
cancel()
|
|
|
|
select {
|
|
case <-serverDone:
|
|
case <-time.After(2 * time.Second):
|
|
t.Error("Server did not shut down within timeout")
|
|
}
|
|
|
|
testStore.Close()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
if _, err := os.Stat(socketPath); !os.IsNotExist(err) {
|
|
t.Error("Socket should be cleaned up after shutdown")
|
|
}
|
|
}
|
|
|
|
func TestDaemonServerStartFailureSocketExists(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
tmpDir := makeSocketTempDir(t)
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
socketPath := filepath.Join(tmpDir, "test.sock")
|
|
testDBPath := filepath.Join(tmpDir, "test.db")
|
|
|
|
testStore1, err := sqlite.New(testDBPath)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create test database: %v", err)
|
|
}
|
|
defer testStore1.Close()
|
|
|
|
server1 := newMockDaemonServer(socketPath, testStore1)
|
|
|
|
ctx1, cancel1 := context.WithCancel(context.Background())
|
|
defer cancel1()
|
|
|
|
go server1.Start(ctx1)
|
|
|
|
// Wait for server to be ready
|
|
if err := server1.WaitReady(2 * time.Second); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Verify socket exists (with retry for filesystem sync)
|
|
var socketFound bool
|
|
for i := 0; i < 10; i++ {
|
|
if _, err := os.Stat(socketPath); err == nil {
|
|
socketFound = true
|
|
break
|
|
}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
if !socketFound {
|
|
t.Fatal("Socket should exist for first server")
|
|
}
|
|
|
|
testStore2, err := sqlite.New(filepath.Join(tmpDir, "test2.db"))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create second test database: %v", err)
|
|
}
|
|
defer testStore2.Close()
|
|
|
|
server2 := newMockDaemonServer(socketPath, testStore2)
|
|
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel2()
|
|
|
|
startErr := make(chan error, 1)
|
|
go func() {
|
|
startErr <- server2.Start(ctx2)
|
|
}()
|
|
|
|
select {
|
|
case err := <-startErr:
|
|
if err == nil {
|
|
t.Error("Expected second server to fail to start, but it succeeded")
|
|
}
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
cancel1()
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
|
|
func TestDaemonGracefulShutdown(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
tmpDir := makeSocketTempDir(t)
|
|
defer os.RemoveAll(tmpDir)
|
|
|
|
socketPath := filepath.Join(tmpDir, "test.sock")
|
|
testDBPath := filepath.Join(tmpDir, "test.db")
|
|
|
|
testStore := newTestStore(t, testDBPath)
|
|
|
|
server := newMockDaemonServer(socketPath, testStore)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
serverDone := make(chan error, 1)
|
|
startTime := time.Now()
|
|
|
|
go func() {
|
|
serverDone <- server.Start(ctx)
|
|
}()
|
|
|
|
time.Sleep(200 * time.Millisecond)
|
|
|
|
cancel()
|
|
|
|
select {
|
|
case err := <-serverDone:
|
|
shutdownDuration := time.Since(startTime)
|
|
|
|
if err != nil && err != context.Canceled {
|
|
t.Errorf("Server returned unexpected error: %v", err)
|
|
}
|
|
|
|
if shutdownDuration > 3*time.Second {
|
|
t.Errorf("Shutdown took too long: %v", shutdownDuration)
|
|
}
|
|
|
|
testStore.Close()
|
|
|
|
if _, err := os.Stat(socketPath); !os.IsNotExist(err) {
|
|
t.Error("Socket should be cleaned up after graceful shutdown")
|
|
}
|
|
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("Server did not shut down gracefully within timeout")
|
|
}
|
|
}
|
|
|
|
type mockDaemonServer struct {
|
|
socketPath string
|
|
store storage.Storage
|
|
listener net.Listener
|
|
mu sync.Mutex
|
|
shutdown bool
|
|
ready chan error
|
|
}
|
|
|
|
func newMockDaemonServer(socketPath string, store storage.Storage) *mockDaemonServer {
|
|
return &mockDaemonServer{
|
|
socketPath: socketPath,
|
|
store: store,
|
|
ready: make(chan error, 1),
|
|
}
|
|
}
|
|
|
|
func (s *mockDaemonServer) WaitReady(timeout time.Duration) error {
|
|
select {
|
|
case err := <-s.ready:
|
|
return err
|
|
case <-time.After(timeout):
|
|
return fmt.Errorf("server did not become ready within %v", timeout)
|
|
}
|
|
}
|
|
|
|
func (s *mockDaemonServer) Start(ctx context.Context) error {
|
|
if err := os.MkdirAll(filepath.Dir(s.socketPath), 0750); err != nil {
|
|
return fmt.Errorf("failed to create socket directory: %w", err)
|
|
}
|
|
|
|
// Check if socket already exists
|
|
if _, err := os.Stat(s.socketPath); err == nil {
|
|
// Socket exists - try to connect to see if server is running
|
|
conn, err := net.Dial("unix", s.socketPath)
|
|
if err == nil {
|
|
conn.Close()
|
|
startErr := fmt.Errorf("socket already in use: %s", s.socketPath)
|
|
s.ready <- startErr
|
|
return startErr
|
|
}
|
|
// Socket is stale, remove it
|
|
_ = os.Remove(s.socketPath)
|
|
}
|
|
|
|
var err error
|
|
s.listener, err = net.Listen("unix", s.socketPath)
|
|
if err != nil {
|
|
startErr := fmt.Errorf("failed to listen on socket: %w", err)
|
|
s.ready <- startErr
|
|
return startErr
|
|
}
|
|
|
|
// Signal that server is ready
|
|
s.ready <- nil
|
|
|
|
// Set up cleanup before accepting connections
|
|
defer func() {
|
|
s.listener.Close()
|
|
os.Remove(s.socketPath)
|
|
}()
|
|
|
|
doneChan := make(chan struct{})
|
|
go func() {
|
|
<-ctx.Done()
|
|
s.mu.Lock()
|
|
s.shutdown = true
|
|
s.mu.Unlock()
|
|
s.listener.Close()
|
|
close(doneChan)
|
|
}()
|
|
|
|
for {
|
|
conn, err := s.listener.Accept()
|
|
if err != nil {
|
|
s.mu.Lock()
|
|
shutdown := s.shutdown
|
|
s.mu.Unlock()
|
|
if shutdown {
|
|
<-doneChan
|
|
return ctx.Err()
|
|
}
|
|
return fmt.Errorf("failed to accept connection: %w", err)
|
|
}
|
|
conn.Close()
|
|
}
|
|
}
|
|
|
|
// TestMutationToExportLatency tests the latency from mutation to JSONL export
|
|
// Target: <500ms for single mutation, verify batching for rapid mutations
|
|
//
|
|
// NOTE: This test currently tests the existing auto-flush mechanism with debounce.
|
|
// Once bd-85 (event-driven daemon) is fully implemented and enabled by default,
|
|
// this test should verify <500ms latency instead of the current debounce-based timing.
|
|
func TestMutationToExportLatency(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping integration test in short mode")
|
|
}
|
|
|
|
t.Skip("Skipping until event-driven daemon (bd-85) is fully implemented")
|
|
|
|
tmpDir := t.TempDir()
|
|
dbDir := filepath.Join(tmpDir, ".beads")
|
|
if err := os.MkdirAll(dbDir, 0755); err != nil {
|
|
t.Fatalf("Failed to create beads dir: %v", err)
|
|
}
|
|
|
|
testDBPath := filepath.Join(dbDir, "test.db")
|
|
jsonlPath := filepath.Join(dbDir, "issues.jsonl")
|
|
|
|
// Initialize git repo (required for auto-flush)
|
|
initTestGitRepo(t, tmpDir)
|
|
|
|
testStore := newTestStore(t, testDBPath)
|
|
defer testStore.Close()
|
|
|
|
// Configure test environment - set global store
|
|
oldDBPath := dbPath
|
|
oldStore := store
|
|
oldStoreActive := storeActive
|
|
oldAutoFlush := autoFlushEnabled
|
|
origDebounce := config.GetDuration("flush-debounce")
|
|
defer func() {
|
|
dbPath = oldDBPath
|
|
store = oldStore
|
|
storeMutex.Lock()
|
|
storeActive = oldStoreActive
|
|
storeMutex.Unlock()
|
|
autoFlushEnabled = oldAutoFlush
|
|
config.Set("flush-debounce", origDebounce)
|
|
clearAutoFlushState()
|
|
}()
|
|
|
|
dbPath = testDBPath
|
|
store = testStore
|
|
storeMutex.Lock()
|
|
storeActive = true
|
|
storeMutex.Unlock()
|
|
autoFlushEnabled = true
|
|
// Use fast debounce for testing (500ms to match event-driven target)
|
|
config.Set("flush-debounce", 500*time.Millisecond)
|
|
|
|
ctx := context.Background()
|
|
|
|
// Get JSONL mod time
|
|
getModTime := func() time.Time {
|
|
info, err := os.Stat(jsonlPath)
|
|
if err != nil {
|
|
return time.Time{}
|
|
}
|
|
return info.ModTime()
|
|
}
|
|
|
|
// Test 1: Single mutation latency with markDirtyAndScheduleFlush
|
|
t.Run("SingleMutationLatency", func(t *testing.T) {
|
|
initialModTime := getModTime()
|
|
|
|
// Create issue through store
|
|
issue := &types.Issue{
|
|
Title: "Latency test issue",
|
|
Description: "Testing export latency",
|
|
Status: types.StatusOpen,
|
|
Priority: 1,
|
|
IssueType: types.TypeTask,
|
|
}
|
|
|
|
start := time.Now()
|
|
if err := testStore.CreateIssue(ctx, issue, "test"); err != nil {
|
|
t.Fatalf("Failed to create issue: %v", err)
|
|
}
|
|
|
|
// Manually trigger flush (simulating what CLI commands do)
|
|
markDirtyAndScheduleFlush()
|
|
|
|
// Wait for JSONL file to be updated (with timeout)
|
|
timeout := time.After(2 * time.Second) // 500ms debounce + margin
|
|
ticker := time.NewTicker(10 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
var updated bool
|
|
var latency time.Duration
|
|
for !updated {
|
|
select {
|
|
case <-ticker.C:
|
|
modTime := getModTime()
|
|
if modTime.After(initialModTime) {
|
|
latency = time.Since(start)
|
|
updated = true
|
|
}
|
|
case <-timeout:
|
|
t.Fatal("JSONL file not updated within 2 seconds")
|
|
}
|
|
}
|
|
|
|
t.Logf("Single mutation export latency: %v", latency)
|
|
|
|
// Verify <1s latency (500ms debounce + export time)
|
|
if latency > 1*time.Second {
|
|
t.Errorf("Latency %v exceeds 1s threshold", latency)
|
|
}
|
|
})
|
|
|
|
// Test 2: Rapid mutations should batch
|
|
t.Run("RapidMutationBatching", func(t *testing.T) {
|
|
preTestModTime := getModTime()
|
|
|
|
// Create 5 issues rapidly
|
|
numIssues := 5
|
|
start := time.Now()
|
|
|
|
for i := 0; i < numIssues; i++ {
|
|
issue := &types.Issue{
|
|
Title: fmt.Sprintf("Batch test issue %d", i),
|
|
Description: "Testing batching",
|
|
Status: types.StatusOpen,
|
|
Priority: 1,
|
|
IssueType: types.TypeTask,
|
|
}
|
|
if err := testStore.CreateIssue(ctx, issue, "test"); err != nil {
|
|
t.Fatalf("Failed to create issue %d: %v", i, err)
|
|
}
|
|
// Trigger flush for each
|
|
markDirtyAndScheduleFlush()
|
|
// Small delay to ensure they're separate operations
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
|
|
creationDuration := time.Since(start)
|
|
t.Logf("Created %d issues in %v", numIssues, creationDuration)
|
|
|
|
// Wait for JSONL update
|
|
timeout := time.After(2 * time.Second) // 500ms debounce + margin
|
|
ticker := time.NewTicker(10 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
|
|
var updated bool
|
|
for !updated {
|
|
select {
|
|
case <-ticker.C:
|
|
modTime := getModTime()
|
|
if modTime.After(preTestModTime) {
|
|
updated = true
|
|
}
|
|
case <-timeout:
|
|
t.Fatal("JSONL file not updated within 2 seconds")
|
|
}
|
|
}
|
|
|
|
totalLatency := time.Since(start)
|
|
t.Logf("All mutations exported in %v", totalLatency)
|
|
|
|
// Verify batching: rapid calls to markDirty within debounce window
|
|
// should result in single flush after ~500ms
|
|
if totalLatency > 2*time.Second {
|
|
t.Errorf("Batching failed: total latency %v exceeds 2s", totalLatency)
|
|
}
|
|
})
|
|
}
|