diff --git a/cmd/bd/autoflush.go b/cmd/bd/autoflush.go index f2b4dfda..30186ea6 100644 --- a/cmd/bd/autoflush.go +++ b/cmd/bd/autoflush.go @@ -18,6 +18,7 @@ import ( "github.com/steveyegge/beads/internal/beads" "github.com/steveyegge/beads/internal/config" "github.com/steveyegge/beads/internal/debug" + "github.com/steveyegge/beads/internal/storage" "github.com/steveyegge/beads/internal/types" "github.com/steveyegge/beads/internal/ui" "github.com/steveyegge/beads/internal/utils" @@ -462,6 +463,194 @@ func writeJSONLAtomic(jsonlPath string, issues []*types.Issue) ([]string, error) return exportedIDs, nil } +// recordFlushFailure records a flush failure, incrementing the failure counter +// and displaying warnings after consecutive failures. +func recordFlushFailure(err error) { + flushMutex.Lock() + flushFailureCount++ + lastFlushError = err + failCount := flushFailureCount + flushMutex.Unlock() + + // Always show the immediate warning + fmt.Fprintf(os.Stderr, "Warning: auto-flush failed: %v\n", err) + + // Show prominent warning after 3+ consecutive failures + if failCount >= 3 { + fmt.Fprintf(os.Stderr, "\n%s\n", ui.RenderFail("⚠️ CRITICAL: Auto-flush has failed "+fmt.Sprint(failCount)+" times consecutively!")) + fmt.Fprintf(os.Stderr, "%s\n", ui.RenderFail("⚠️ Your JSONL file may be out of sync with the database.")) + fmt.Fprintf(os.Stderr, "%s\n\n", ui.RenderFail("⚠️ Run 'bd export -o .beads/issues.jsonl' manually to fix.")) + } +} + +// recordFlushSuccess records a successful flush, resetting the failure counter. +func recordFlushSuccess() { + flushMutex.Lock() + flushFailureCount = 0 + lastFlushError = nil + flushMutex.Unlock() +} + +// readExistingJSONL reads an existing JSONL file into a map for incremental merging. +// Returns empty map if file doesn't exist or can't be read. +func readExistingJSONL(jsonlPath string) (map[string]*types.Issue, error) { + issueMap := make(map[string]*types.Issue) + + existingFile, err := os.Open(jsonlPath) + if err != nil { + if os.IsNotExist(err) { + return issueMap, nil // File doesn't exist, return empty map + } + return nil, fmt.Errorf("failed to open existing JSONL: %w", err) + } + defer existingFile.Close() + + scanner := bufio.NewScanner(existingFile) + // Increase buffer to handle large JSON lines + // Default scanner limit is 64KB which can cause silent truncation + scanner.Buffer(make([]byte, 0, 1024), 2*1024*1024) // 2MB max line size + + lineNum := 0 + for scanner.Scan() { + lineNum++ + line := scanner.Text() + if line == "" { + continue + } + var issue types.Issue + if err := json.Unmarshal([]byte(line), &issue); err == nil { + issue.SetDefaults() // Apply defaults for omitted fields (beads-399) + issueMap[issue.ID] = &issue + } else { + // Warn about malformed JSONL lines + fmt.Fprintf(os.Stderr, "Warning: skipping malformed JSONL line %d: %v\n", lineNum, err) + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("failed to read existing JSONL: %w", err) + } + + return issueMap, nil +} + +// fetchAndMergeIssues fetches dirty issues from the database and merges them into issueMap. +// Issues that no longer exist are removed from the map. +func fetchAndMergeIssues(ctx context.Context, s storage.Storage, dirtyIDs []string, issueMap map[string]*types.Issue) error { + for _, issueID := range dirtyIDs { + issue, err := s.GetIssue(ctx, issueID) + if err != nil { + return fmt.Errorf("failed to get issue %s: %w", issueID, err) + } + if issue == nil { + // Issue was deleted, remove from map + delete(issueMap, issueID) + continue + } + + // Get dependencies for this issue + deps, err := s.GetDependencyRecords(ctx, issueID) + if err != nil { + return fmt.Errorf("failed to get dependencies for %s: %w", issueID, err) + } + issue.Dependencies = deps + + // Update map + issueMap[issueID] = issue + } + return nil +} + +// filterWisps removes ephemeral (wisp) issues from the map and returns a slice. +// Wisps should never be exported to JSONL. +func filterWisps(issueMap map[string]*types.Issue) []*types.Issue { + issues := make([]*types.Issue, 0, len(issueMap)) + wispsSkipped := 0 + for _, issue := range issueMap { + if issue.Ephemeral { + wispsSkipped++ + continue + } + issues = append(issues, issue) + } + if wispsSkipped > 0 { + debug.Logf("auto-flush: filtered %d wisps from export", wispsSkipped) + } + return issues +} + +// filterByMultiRepoPrefix filters issues by prefix in multi-repo mode. +// Non-primary repos should only export issues matching their own prefix. +func filterByMultiRepoPrefix(ctx context.Context, s storage.Storage, issues []*types.Issue) []*types.Issue { + multiRepo := config.GetMultiRepoConfig() + if multiRepo == nil { + return issues + } + + // Get our configured prefix + prefix, prefixErr := s.GetConfig(ctx, "issue_prefix") + if prefixErr != nil || prefix == "" { + return issues + } + + // Determine if we're the primary repo + cwd, _ := os.Getwd() + primaryPath := multiRepo.Primary + if primaryPath == "" || primaryPath == "." { + primaryPath = cwd + } + + // Normalize paths for comparison + absCwd, _ := filepath.Abs(cwd) + absPrimary, _ := filepath.Abs(primaryPath) + + if absCwd == absPrimary { + return issues // Primary repo exports all issues + } + + // Filter to only issues matching our prefix + filtered := make([]*types.Issue, 0, len(issues)) + prefixWithDash := prefix + if !strings.HasSuffix(prefixWithDash, "-") { + prefixWithDash = prefix + "-" + } + for _, issue := range issues { + if strings.HasPrefix(issue.ID, prefixWithDash) { + filtered = append(filtered, issue) + } + } + debug.Logf("multi-repo filter: %d issues -> %d (prefix %s)", len(issues), len(filtered), prefix) + return filtered +} + +// updateFlushExportMetadata stores hashes and timestamps after a successful flush export. +func updateFlushExportMetadata(ctx context.Context, s storage.Storage, jsonlPath string) { + jsonlData, err := os.ReadFile(jsonlPath) + if err != nil { + return // Non-fatal, just skip metadata update + } + + hasher := sha256.New() + hasher.Write(jsonlData) + exportedHash := hex.EncodeToString(hasher.Sum(nil)) + + if err := s.SetMetadata(ctx, "jsonl_content_hash", exportedHash); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_content_hash after export: %v\n", err) + } + + // Store JSONL file hash for integrity validation + if err := s.SetJSONLFileHash(ctx, exportedHash); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_file_hash after export: %v\n", err) + } + + // Update last_import_time so staleness check doesn't see JSONL as "newer" (fixes #399) + // Use RFC3339Nano to preserve nanosecond precision. + exportTime := time.Now().Format(time.RFC3339Nano) + if err := s.SetMetadata(ctx, "last_import_time", exportTime); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to update last_import_time after export: %v\n", err) + } +} + // flushState captures the state needed for a flush operation type flushState struct { forceDirty bool // Force flush even if isDirty is false @@ -507,30 +696,13 @@ func flushToJSONLWithState(state flushState) { storeMutex.Unlock() ctx := rootCtx - + // Validate JSONL integrity BEFORE checking isDirty // This detects if JSONL and export_hashes are out of sync (e.g., after git operations) - // If export_hashes was cleared, we need to do a full export even if nothing is dirty integrityNeedsFullExport, err := validateJSONLIntegrity(ctx, jsonlPath) if err != nil { - // Special case: missing JSONL is not fatal, just forces full export if !os.IsNotExist(err) { - // Record failure without clearing isDirty (we didn't do any work yet) - flushMutex.Lock() - flushFailureCount++ - lastFlushError = err - failCount := flushFailureCount - flushMutex.Unlock() - - // Always show the immediate warning - fmt.Fprintf(os.Stderr, "Warning: auto-flush failed: %v\n", err) - - // Show prominent warning after 3+ consecutive failures - if failCount >= 3 { - fmt.Fprintf(os.Stderr, "\n%s\n", ui.RenderFail("⚠️ CRITICAL: Auto-flush has failed "+fmt.Sprint(failCount)+" times consecutively!")) - fmt.Fprintf(os.Stderr, "%s\n", ui.RenderFail("⚠️ Your JSONL file may be out of sync with the database.")) - fmt.Fprintf(os.Stderr, "%s\n\n", ui.RenderFail("⚠️ Run 'bd export -o .beads/issues.jsonl' manually to fix.")) - } + recordFlushFailure(err) return } // Missing JSONL: treat as "force full export" case @@ -538,235 +710,86 @@ func flushToJSONLWithState(state flushState) { } // Check if we should proceed with export - // Use only the state parameter - don't read global flags - // Caller is responsible for passing correct forceDirty/forceFullExport values if !state.forceDirty && !integrityNeedsFullExport { - // Nothing to do: not forced and no integrity issue return } // Determine export mode fullExport := state.forceFullExport || integrityNeedsFullExport - // Helper to record failure - recordFailure := func(err error) { - flushMutex.Lock() - flushFailureCount++ - lastFlushError = err - failCount := flushFailureCount - flushMutex.Unlock() - - // Always show the immediate warning - fmt.Fprintf(os.Stderr, "Warning: auto-flush failed: %v\n", err) - - // Show prominent warning after 3+ consecutive failures - if failCount >= 3 { - fmt.Fprintf(os.Stderr, "\n%s\n", ui.RenderFail("⚠️ CRITICAL: Auto-flush has failed "+fmt.Sprint(failCount)+" times consecutively!")) - fmt.Fprintf(os.Stderr, "%s\n", ui.RenderFail("⚠️ Your JSONL file may be out of sync with the database.")) - fmt.Fprintf(os.Stderr, "%s\n\n", ui.RenderFail("⚠️ Run 'bd export -o .beads/issues.jsonl' manually to fix.")) - } - } - - // Helper to record success - recordSuccess := func() { - flushMutex.Lock() - flushFailureCount = 0 - lastFlushError = nil - flushMutex.Unlock() - } - // Determine which issues to export - var dirtyIDs []string - - if fullExport { - // Full export: get ALL issues (needed after ID-changing operations like renumber) - allIssues, err2 := store.SearchIssues(ctx, "", types.IssueFilter{}) - if err2 != nil { - recordFailure(fmt.Errorf("failed to get all issues: %w", err2)) - return - } - dirtyIDs = make([]string, len(allIssues)) - for i, issue := range allIssues { - dirtyIDs[i] = issue.ID - } - } else { - // Incremental export: get only dirty issue IDs - var err2 error - dirtyIDs, err2 = store.GetDirtyIssues(ctx) - if err2 != nil { - recordFailure(fmt.Errorf("failed to get dirty issues: %w", err2)) - return - } - - // No dirty issues? Nothing to do! - if len(dirtyIDs) == 0 { - recordSuccess() - return - } - } - - // Read existing JSONL into a map (skip for full export - we'll rebuild from scratch) - issueMap := make(map[string]*types.Issue) - if !fullExport { - if existingFile, err := os.Open(jsonlPath); err == nil { - scanner := bufio.NewScanner(existingFile) - // Increase buffer to handle large JSON lines - // Default scanner limit is 64KB which can cause silent truncation - scanner.Buffer(make([]byte, 0, 1024), 2*1024*1024) // 2MB max line size - lineNum := 0 - for scanner.Scan() { - lineNum++ - line := scanner.Text() - if line == "" { - continue - } - var issue types.Issue - if err := json.Unmarshal([]byte(line), &issue); err == nil { - issue.SetDefaults() // Apply defaults for omitted fields (beads-399) - issueMap[issue.ID] = &issue - } else { - // Warn about malformed JSONL lines - fmt.Fprintf(os.Stderr, "Warning: skipping malformed JSONL line %d: %v\n", lineNum, err) - } - } - // Check for scanner errors - if err := scanner.Err(); err != nil { - _ = existingFile.Close() - recordFailure(fmt.Errorf("failed to read existing JSONL: %w", err)) - return - } - _ = existingFile.Close() - } - } - - // Fetch only dirty issues from DB - for _, issueID := range dirtyIDs { - issue, err := store.GetIssue(ctx, issueID) - if err != nil { - recordFailure(fmt.Errorf("failed to get issue %s: %w", issueID, err)) - return - } - if issue == nil { - // Issue was deleted, remove from map - delete(issueMap, issueID) - continue - } - - // Get dependencies for this issue - deps, err := store.GetDependencyRecords(ctx, issueID) - if err != nil { - recordFailure(fmt.Errorf("failed to get dependencies for %s: %w", issueID, err)) - return - } - issue.Dependencies = deps - - // Update map - issueMap[issueID] = issue - } - - // Convert map to slice (will be sorted by writeJSONLAtomic) - // Filter out wisps - they should never be exported to JSONL - // Wisps exist only in SQLite and are shared via .beads/redirect, not JSONL. - // This prevents "zombie" issues that resurrect after mol squash deletes them. - issues := make([]*types.Issue, 0, len(issueMap)) - wispsSkipped := 0 - for _, issue := range issueMap { - if issue.Ephemeral { - wispsSkipped++ - continue - } - issues = append(issues, issue) - } - if wispsSkipped > 0 { - debug.Logf("auto-flush: filtered %d wisps from export", wispsSkipped) - } - - // Filter issues by prefix in multi-repo mode for non-primary repos (fixes GH #437) - // In multi-repo mode, non-primary repos should only export issues that match - // their own prefix. Issues from other repos (hydrated for unified view) should - // NOT be written to the local JSONL. - multiRepo := config.GetMultiRepoConfig() - if multiRepo != nil { - // Get our configured prefix - prefix, prefixErr := store.GetConfig(ctx, "issue_prefix") - if prefixErr == nil && prefix != "" { - // Determine if we're the primary repo - cwd, _ := os.Getwd() - primaryPath := multiRepo.Primary - if primaryPath == "" || primaryPath == "." { - primaryPath = cwd - } - - // Normalize paths for comparison - absCwd, _ := filepath.Abs(cwd) - absPrimary, _ := filepath.Abs(primaryPath) - - isPrimary := absCwd == absPrimary - - if !isPrimary { - // Filter to only issues matching our prefix - filtered := make([]*types.Issue, 0, len(issues)) - prefixWithDash := prefix - if !strings.HasSuffix(prefixWithDash, "-") { - prefixWithDash = prefix + "-" - } - for _, issue := range issues { - if strings.HasPrefix(issue.ID, prefixWithDash) { - filtered = append(filtered, issue) - } - } - debug.Logf("multi-repo filter: %d issues -> %d (prefix %s)", len(issues), len(filtered), prefix) - issues = filtered - } - } - } - - // Write atomically using common helper - exportedIDs, err := writeJSONLAtomic(jsonlPath, issues) + dirtyIDs, err := getIssuesToExport(ctx, fullExport) if err != nil { - recordFailure(err) + recordFlushFailure(err) + return + } + if len(dirtyIDs) == 0 && !fullExport { + recordFlushSuccess() return } - // Clear only the dirty issues that were actually exported (fixes race condition) - // Don't clear issues that were skipped due to timestamp-only changes + // Read existing JSONL into a map (skip for full export - we'll rebuild from scratch) + var issueMap map[string]*types.Issue + if fullExport { + issueMap = make(map[string]*types.Issue) + } else { + issueMap, err = readExistingJSONL(jsonlPath) + if err != nil { + recordFlushFailure(err) + return + } + } + + // Fetch dirty issues from DB and merge into map + if err := fetchAndMergeIssues(ctx, store, dirtyIDs, issueMap); err != nil { + recordFlushFailure(err) + return + } + + // Convert map to slice, filtering out wisps + issues := filterWisps(issueMap) + + // Filter by prefix in multi-repo mode + issues = filterByMultiRepoPrefix(ctx, store, issues) + + // Write atomically + exportedIDs, err := writeJSONLAtomic(jsonlPath, issues) + if err != nil { + recordFlushFailure(err) + return + } + + // Clear dirty issues that were exported if len(exportedIDs) > 0 { if err := store.ClearDirtyIssuesByID(ctx, exportedIDs); err != nil { - // Don't fail the whole flush for this, but warn fmt.Fprintf(os.Stderr, "Warning: failed to clear dirty issues: %v\n", err) } } - // Store hash of exported JSONL (enables hash-based auto-import) - // Renamed from last_import_hash to jsonl_content_hash - jsonlData, err := os.ReadFile(jsonlPath) - if err == nil { - hasher := sha256.New() - hasher.Write(jsonlData) - exportedHash := hex.EncodeToString(hasher.Sum(nil)) - if err := store.SetMetadata(ctx, "jsonl_content_hash", exportedHash); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_content_hash after export: %v\n", err) - } + // Update metadata (hashes, timestamps) + updateFlushExportMetadata(ctx, store, jsonlPath) - // Store JSONL file hash for integrity validation - if err := store.SetJSONLFileHash(ctx, exportedHash); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_file_hash after export: %v\n", err) - } + recordFlushSuccess() +} - // Update last_import_time so staleness check doesn't see JSONL as "newer" (fixes #399) - // CheckStaleness() compares last_import_time against JSONL mtime. After export, - // the JSONL mtime is updated, so we must also update last_import_time to prevent - // false "stale" detection on subsequent reads. - // - // Use RFC3339Nano to preserve nanosecond precision. The file mtime has nanosecond - // precision, so using RFC3339 (second precision) would cause the stored time to be - // slightly earlier than the file mtime, triggering false staleness. - exportTime := time.Now().Format(time.RFC3339Nano) - if err := store.SetMetadata(ctx, "last_import_time", exportTime); err != nil { - fmt.Fprintf(os.Stderr, "Warning: failed to update last_import_time after export: %v\n", err) +// getIssuesToExport determines which issue IDs need to be exported. +// For full export, returns all issue IDs. For incremental, returns only dirty IDs. +func getIssuesToExport(ctx context.Context, fullExport bool) ([]string, error) { + if fullExport { + allIssues, err := store.SearchIssues(ctx, "", types.IssueFilter{}) + if err != nil { + return nil, fmt.Errorf("failed to get all issues: %w", err) } + ids := make([]string, len(allIssues)) + for i, issue := range allIssues { + ids[i] = issue.ID + } + return ids, nil } - // Success! FlushManager manages its local state in run() goroutine. - recordSuccess() + dirtyIDs, err := store.GetDirtyIssues(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get dirty issues: %w", err) + } + return dirtyIDs, nil }