refactor: remove legacy autoflush code paths (bd-xsl9)
Remove dual code paths in the autoflush system. FlushManager is now the only code path for auto-flush operations. Changes: - Remove legacy globals: isDirty, needsFullExport, flushTimer - Remove flushToJSONL() wrapper function (was backward-compat shim) - Simplify markDirtyAndScheduleFlush/FullExport to just call FlushManager - Update tests to use FlushManager or flushToJSONLWithState directly FlushManager handles all flush state internally in its run() goroutine, eliminating the need for global state. Sandbox mode and tests that do not need flushing get a no-op when FlushManager is nil. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -283,94 +283,36 @@ func autoImportIfNewer() {
|
|||||||
// Thread-safe: Safe to call from multiple goroutines (no shared mutable state).
|
// Thread-safe: Safe to call from multiple goroutines (no shared mutable state).
|
||||||
// No-op if auto-flush is disabled via --no-auto-flush flag.
|
// No-op if auto-flush is disabled via --no-auto-flush flag.
|
||||||
func markDirtyAndScheduleFlush() {
|
func markDirtyAndScheduleFlush() {
|
||||||
// Use FlushManager if available (new path, fixes bd-52)
|
// Use FlushManager if available
|
||||||
|
// No FlushManager means sandbox mode or test without flush setup - no-op is correct
|
||||||
if flushManager != nil {
|
if flushManager != nil {
|
||||||
flushManager.MarkDirty(false) // Incremental export
|
flushManager.MarkDirty(false) // Incremental export
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Legacy path for backward compatibility with tests
|
|
||||||
if !autoFlushEnabled {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
flushMutex.Lock()
|
|
||||||
defer flushMutex.Unlock()
|
|
||||||
|
|
||||||
isDirty = true
|
|
||||||
|
|
||||||
// Cancel existing timer if any
|
|
||||||
if flushTimer != nil {
|
|
||||||
flushTimer.Stop()
|
|
||||||
flushTimer = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedule new flush
|
|
||||||
flushTimer = time.AfterFunc(getDebounceDuration(), func() {
|
|
||||||
flushToJSONL()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// markDirtyAndScheduleFullExport marks DB as needing a full export (for ID-changing operations)
|
// markDirtyAndScheduleFullExport marks DB as needing a full export (for ID-changing operations)
|
||||||
func markDirtyAndScheduleFullExport() {
|
func markDirtyAndScheduleFullExport() {
|
||||||
// Use FlushManager if available (new path, fixes bd-52)
|
// Use FlushManager if available
|
||||||
|
// No FlushManager means sandbox mode or test without flush setup - no-op is correct
|
||||||
if flushManager != nil {
|
if flushManager != nil {
|
||||||
flushManager.MarkDirty(true) // Full export
|
flushManager.MarkDirty(true) // Full export
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Legacy path for backward compatibility with tests
|
|
||||||
if !autoFlushEnabled {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
flushMutex.Lock()
|
|
||||||
defer flushMutex.Unlock()
|
|
||||||
|
|
||||||
isDirty = true
|
|
||||||
needsFullExport = true // Force full export, not incremental
|
|
||||||
|
|
||||||
// Cancel existing timer if any
|
|
||||||
if flushTimer != nil {
|
|
||||||
flushTimer.Stop()
|
|
||||||
flushTimer = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schedule new flush
|
|
||||||
flushTimer = time.AfterFunc(getDebounceDuration(), func() {
|
|
||||||
flushToJSONL()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// clearAutoFlushState cancels pending flush and marks DB as clean (after manual export)
|
// clearAutoFlushState cancels pending flush and marks DB as clean (after manual export)
|
||||||
func clearAutoFlushState() {
|
func clearAutoFlushState() {
|
||||||
// With FlushManager, clearing state is unnecessary (new path)
|
// With FlushManager, clearing state is unnecessary
|
||||||
// If a flush is pending and fires after manual export, flushToJSONLWithState()
|
// If a flush is pending and fires after manual export, flushToJSONLWithState()
|
||||||
// will detect nothing is dirty and skip the flush. This is harmless.
|
// will detect nothing is dirty and skip the flush. This is harmless.
|
||||||
if flushManager != nil {
|
// Reset failure counters on manual export success
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Legacy path for backward compatibility with tests
|
|
||||||
flushMutex.Lock()
|
flushMutex.Lock()
|
||||||
defer flushMutex.Unlock()
|
|
||||||
|
|
||||||
// Cancel pending timer
|
|
||||||
if flushTimer != nil {
|
|
||||||
flushTimer.Stop()
|
|
||||||
flushTimer = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear dirty flag
|
|
||||||
isDirty = false
|
|
||||||
|
|
||||||
// Reset failure counter (manual export succeeded)
|
|
||||||
flushFailureCount = 0
|
flushFailureCount = 0
|
||||||
lastFlushError = nil
|
lastFlushError = nil
|
||||||
|
flushMutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeJSONLAtomic writes issues to a JSONL file atomically using temp file + rename.
|
// writeJSONLAtomic writes issues to a JSONL file atomically using temp file + rename.
|
||||||
// This is the common implementation used by both flushToJSONL (SQLite mode) and
|
// This is the common implementation used by flushToJSONLWithState (SQLite mode) and
|
||||||
// writeIssuesToJSONL (--no-db mode).
|
// writeIssuesToJSONL (--no-db mode).
|
||||||
//
|
//
|
||||||
// Atomic write pattern:
|
// Atomic write pattern:
|
||||||
@@ -807,38 +749,6 @@ func flushToJSONLWithState(state flushState) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success! Don't clear global flags here - let the caller manage its own state.
|
// Success! FlushManager manages its local state in run() goroutine.
|
||||||
// FlushManager manages its local state in run() goroutine.
|
|
||||||
// Legacy path clears global state in flushToJSONL() wrapper.
|
|
||||||
recordSuccess()
|
recordSuccess()
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushToJSONL is a backward-compatible wrapper that reads global state.
|
|
||||||
// New code should use FlushManager instead of calling this directly.
|
|
||||||
//
|
|
||||||
// Reads global isDirty and needsFullExport flags, then calls flushToJSONLWithState.
|
|
||||||
// Invoked by the debounce timer or immediately on command exit (for legacy code).
|
|
||||||
func flushToJSONL() {
|
|
||||||
// Read current state and failure count
|
|
||||||
flushMutex.Lock()
|
|
||||||
forceDirty := isDirty
|
|
||||||
forceFullExport := needsFullExport
|
|
||||||
beforeFailCount := flushFailureCount
|
|
||||||
flushMutex.Unlock()
|
|
||||||
|
|
||||||
// Call new implementation
|
|
||||||
flushToJSONLWithState(flushState{
|
|
||||||
forceDirty: forceDirty,
|
|
||||||
forceFullExport: forceFullExport,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Clear global state only if flush succeeded (legacy path only)
|
|
||||||
// Success is indicated by failure count not increasing
|
|
||||||
flushMutex.Lock()
|
|
||||||
if flushFailureCount == beforeFailCount {
|
|
||||||
// Flush succeeded - clear dirty flags
|
|
||||||
isDirty = false
|
|
||||||
needsFullExport = false
|
|
||||||
}
|
|
||||||
flushMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -129,14 +129,13 @@ func runBDInProcess(t *testing.T, dir string, args ...string) string {
|
|||||||
sandboxMode = false
|
sandboxMode = false
|
||||||
noDb = false
|
noDb = false
|
||||||
autoFlushEnabled = true
|
autoFlushEnabled = true
|
||||||
isDirty = false
|
|
||||||
needsFullExport = false
|
|
||||||
storeActive = false
|
storeActive = false
|
||||||
flushFailureCount = 0
|
flushFailureCount = 0
|
||||||
lastFlushError = nil
|
lastFlushError = nil
|
||||||
if flushTimer != nil {
|
// Shutdown any existing FlushManager
|
||||||
flushTimer.Stop()
|
if flushManager != nil {
|
||||||
flushTimer = nil
|
_ = flushManager.Shutdown()
|
||||||
|
flushManager = nil
|
||||||
}
|
}
|
||||||
// Reset context state
|
// Reset context state
|
||||||
rootCtx = nil
|
rootCtx = nil
|
||||||
|
|||||||
@@ -28,17 +28,15 @@ func TestFallbackToDirectModeEnablesFlush(t *testing.T) {
|
|||||||
origDBPath := dbPath
|
origDBPath := dbPath
|
||||||
origAutoImport := autoImportEnabled
|
origAutoImport := autoImportEnabled
|
||||||
origAutoFlush := autoFlushEnabled
|
origAutoFlush := autoFlushEnabled
|
||||||
origIsDirty := isDirty
|
|
||||||
origNeedsFull := needsFullExport
|
|
||||||
origFlushFailures := flushFailureCount
|
origFlushFailures := flushFailureCount
|
||||||
origLastFlushErr := lastFlushError
|
origLastFlushErr := lastFlushError
|
||||||
|
origFlushManager := flushManager
|
||||||
|
|
||||||
flushMutex.Lock()
|
// Shutdown any existing FlushManager
|
||||||
if flushTimer != nil {
|
if flushManager != nil {
|
||||||
flushTimer.Stop()
|
_ = flushManager.Shutdown()
|
||||||
flushTimer = nil
|
flushManager = nil
|
||||||
}
|
}
|
||||||
flushMutex.Unlock()
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if store != nil && store != origStore {
|
if store != nil && store != origStore {
|
||||||
@@ -54,17 +52,14 @@ func TestFallbackToDirectModeEnablesFlush(t *testing.T) {
|
|||||||
dbPath = origDBPath
|
dbPath = origDBPath
|
||||||
autoImportEnabled = origAutoImport
|
autoImportEnabled = origAutoImport
|
||||||
autoFlushEnabled = origAutoFlush
|
autoFlushEnabled = origAutoFlush
|
||||||
isDirty = origIsDirty
|
|
||||||
needsFullExport = origNeedsFull
|
|
||||||
flushFailureCount = origFlushFailures
|
flushFailureCount = origFlushFailures
|
||||||
lastFlushError = origLastFlushErr
|
lastFlushError = origLastFlushErr
|
||||||
|
|
||||||
flushMutex.Lock()
|
// Restore FlushManager
|
||||||
if flushTimer != nil {
|
if flushManager != nil {
|
||||||
flushTimer.Stop()
|
_ = flushManager.Shutdown()
|
||||||
flushTimer = nil
|
|
||||||
}
|
}
|
||||||
flushMutex.Unlock()
|
flushManager = origFlushManager
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tmpDir := t.TempDir()
|
tmpDir := t.TempDir()
|
||||||
@@ -112,8 +107,6 @@ func TestFallbackToDirectModeEnablesFlush(t *testing.T) {
|
|||||||
daemonStatus = DaemonStatus{}
|
daemonStatus = DaemonStatus{}
|
||||||
autoImportEnabled = false
|
autoImportEnabled = false
|
||||||
autoFlushEnabled = true
|
autoFlushEnabled = true
|
||||||
isDirty = false
|
|
||||||
needsFullExport = false
|
|
||||||
|
|
||||||
if err := fallbackToDirectMode("test fallback"); err != nil {
|
if err := fallbackToDirectMode("test fallback"); err != nil {
|
||||||
t.Fatalf("fallbackToDirectMode failed: %v", err)
|
t.Fatalf("fallbackToDirectMode failed: %v", err)
|
||||||
@@ -131,14 +124,7 @@ func TestFallbackToDirectModeEnablesFlush(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Force a full export and flush synchronously
|
// Force a full export and flush synchronously
|
||||||
markDirtyAndScheduleFullExport()
|
flushToJSONLWithState(flushState{forceDirty: true, forceFullExport: true})
|
||||||
flushMutex.Lock()
|
|
||||||
if flushTimer != nil {
|
|
||||||
flushTimer.Stop()
|
|
||||||
flushTimer = nil
|
|
||||||
}
|
|
||||||
flushMutex.Unlock()
|
|
||||||
flushToJSONL()
|
|
||||||
|
|
||||||
jsonlPath := findJSONLPath()
|
jsonlPath := findJSONLPath()
|
||||||
data, err := os.ReadFile(jsonlPath)
|
data, err := os.ReadFile(jsonlPath)
|
||||||
|
|||||||
@@ -402,8 +402,6 @@ func TestFlushManagerIdempotentShutdown(t *testing.T) {
|
|||||||
func setupTestEnvironment(t *testing.T) {
|
func setupTestEnvironment(t *testing.T) {
|
||||||
autoFlushEnabled = true
|
autoFlushEnabled = true
|
||||||
storeActive = true
|
storeActive = true
|
||||||
isDirty = false
|
|
||||||
needsFullExport = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// teardownTestEnvironment cleans up test environment
|
// teardownTestEnvironment cleans up test environment
|
||||||
|
|||||||
@@ -375,7 +375,7 @@ NOTE: Import requires direct database access and does not work with daemon mode.
|
|||||||
// Without this, daemon FileWatcher won't detect the import for up to 30s
|
// Without this, daemon FileWatcher won't detect the import for up to 30s
|
||||||
// Only flush if there were actual changes to avoid unnecessary I/O
|
// Only flush if there were actual changes to avoid unnecessary I/O
|
||||||
if result.Created > 0 || result.Updated > 0 || len(result.IDMapping) > 0 {
|
if result.Created > 0 || result.Updated > 0 || len(result.IDMapping) > 0 {
|
||||||
flushToJSONL()
|
flushToJSONLWithState(flushState{forceDirty: true})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update jsonl_content_hash metadata to enable content-based staleness detection (bd-khnb fix)
|
// Update jsonl_content_hash metadata to enable content-based staleness detection (bd-khnb fix)
|
||||||
|
|||||||
@@ -74,17 +74,14 @@ var (
|
|||||||
rootCancel context.CancelFunc
|
rootCancel context.CancelFunc
|
||||||
|
|
||||||
// Auto-flush state
|
// Auto-flush state
|
||||||
autoFlushEnabled = true // Can be disabled with --no-auto-flush
|
autoFlushEnabled = true // Can be disabled with --no-auto-flush
|
||||||
isDirty = false // Tracks if DB has changes needing export (used by legacy code)
|
|
||||||
needsFullExport = false // Set to true when IDs change (used by legacy code)
|
|
||||||
flushMutex sync.Mutex
|
flushMutex sync.Mutex
|
||||||
flushTimer *time.Timer // DEPRECATED: Use flushManager instead
|
storeMutex sync.Mutex // Protects store access from background goroutine
|
||||||
storeMutex sync.Mutex // Protects store access from background goroutine
|
storeActive = false // Tracks if store is available
|
||||||
storeActive = false // Tracks if store is available
|
flushFailureCount = 0 // Consecutive flush failures
|
||||||
flushFailureCount = 0 // Consecutive flush failures
|
lastFlushError error // Last flush error for debugging
|
||||||
lastFlushError error // Last flush error for debugging
|
|
||||||
|
|
||||||
// Auto-flush manager (replaces timer-based approach to fix bd-52)
|
// Auto-flush manager (event-driven, fixes bd-52 race condition)
|
||||||
flushManager *FlushManager
|
flushManager *FlushManager
|
||||||
|
|
||||||
// Hook runner for extensibility (bd-kwro.8)
|
// Hook runner for extensibility (bd-kwro.8)
|
||||||
|
|||||||
@@ -16,9 +16,9 @@ import (
|
|||||||
"github.com/steveyegge/beads/internal/types"
|
"github.com/steveyegge/beads/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestAutoFlushOnExit tests that PersistentPostRun performs final flush before exit
|
// TestAutoFlushOnExit tests that FlushManager.Shutdown() performs final flush before exit
|
||||||
func TestAutoFlushOnExit(t *testing.T) {
|
func TestAutoFlushOnExit(t *testing.T) {
|
||||||
// FIX: Initialize rootCtx for flush operations
|
// Initialize rootCtx for flush operations
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@@ -48,15 +48,16 @@ func TestAutoFlushOnExit(t *testing.T) {
|
|||||||
storeActive = true
|
storeActive = true
|
||||||
storeMutex.Unlock()
|
storeMutex.Unlock()
|
||||||
|
|
||||||
// Reset auto-flush state
|
// Initialize FlushManager for this test (short debounce for testing)
|
||||||
autoFlushEnabled = true
|
autoFlushEnabled = true
|
||||||
isDirty = false
|
oldFlushManager := flushManager
|
||||||
if flushTimer != nil {
|
flushManager = NewFlushManager(true, 50*time.Millisecond)
|
||||||
flushTimer.Stop()
|
defer func() {
|
||||||
flushTimer = nil
|
if flushManager != nil {
|
||||||
}
|
_ = flushManager.Shutdown()
|
||||||
|
}
|
||||||
// ctx already declared above for rootCtx initialization
|
flushManager = oldFlushManager
|
||||||
|
}()
|
||||||
|
|
||||||
// Create test issue
|
// Create test issue
|
||||||
issue := &types.Issue{
|
issue := &types.Issue{
|
||||||
@@ -75,50 +76,12 @@ func TestAutoFlushOnExit(t *testing.T) {
|
|||||||
// Mark dirty (simulating CRUD operation)
|
// Mark dirty (simulating CRUD operation)
|
||||||
markDirtyAndScheduleFlush()
|
markDirtyAndScheduleFlush()
|
||||||
|
|
||||||
// Simulate PersistentPostRun (exit behavior)
|
// Simulate PersistentPostRun exit behavior - shutdown FlushManager
|
||||||
storeMutex.Lock()
|
// This performs the final flush before exit
|
||||||
storeActive = false
|
if err := flushManager.Shutdown(); err != nil {
|
||||||
storeMutex.Unlock()
|
t.Fatalf("FlushManager shutdown failed: %v", err)
|
||||||
|
|
||||||
flushMutex.Lock()
|
|
||||||
needsFlush := isDirty && autoFlushEnabled
|
|
||||||
if needsFlush {
|
|
||||||
if flushTimer != nil {
|
|
||||||
flushTimer.Stop()
|
|
||||||
flushTimer = nil
|
|
||||||
}
|
|
||||||
isDirty = false
|
|
||||||
}
|
|
||||||
flushMutex.Unlock()
|
|
||||||
|
|
||||||
if needsFlush {
|
|
||||||
// Manually perform flush logic (simulating PersistentPostRun)
|
|
||||||
storeMutex.Lock()
|
|
||||||
storeActive = true // Temporarily re-enable for this test
|
|
||||||
storeMutex.Unlock()
|
|
||||||
|
|
||||||
issues, err := testStore.SearchIssues(ctx, "", types.IssueFilter{})
|
|
||||||
if err == nil {
|
|
||||||
allDeps, _ := testStore.GetAllDependencyRecords(ctx)
|
|
||||||
for _, iss := range issues {
|
|
||||||
iss.Dependencies = allDeps[iss.ID]
|
|
||||||
}
|
|
||||||
tempPath := jsonlPath + ".tmp"
|
|
||||||
f, err := os.Create(tempPath)
|
|
||||||
if err == nil {
|
|
||||||
encoder := json.NewEncoder(f)
|
|
||||||
for _, iss := range issues {
|
|
||||||
encoder.Encode(iss)
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
os.Rename(tempPath, jsonlPath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
storeMutex.Lock()
|
|
||||||
storeActive = false
|
|
||||||
storeMutex.Unlock()
|
|
||||||
}
|
}
|
||||||
|
flushManager = nil // Prevent double shutdown in defer
|
||||||
|
|
||||||
testStore.Close()
|
testStore.Close()
|
||||||
|
|
||||||
@@ -223,12 +186,8 @@ func TestAutoFlushJSONLContent(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark dirty and flush immediately
|
// Flush immediately (forces export)
|
||||||
flushMutex.Lock()
|
flushToJSONLWithState(flushState{forceDirty: true})
|
||||||
isDirty = true
|
|
||||||
flushMutex.Unlock()
|
|
||||||
|
|
||||||
flushToJSONL()
|
|
||||||
|
|
||||||
// Verify JSONL file exists
|
// Verify JSONL file exists
|
||||||
if _, err := os.Stat(expectedJSONLPath); os.IsNotExist(err) {
|
if _, err := os.Stat(expectedJSONLPath); os.IsNotExist(err) {
|
||||||
|
|||||||
Reference in New Issue
Block a user