refactor: Break up 280-line flushToJSONLWithState into focused helpers (bd-9hc9)
Extracted 8 helper functions from the monolithic flushToJSONLWithState: - recordFlushFailure/recordFlushSuccess: Failure tracking and counter management - readExistingJSONL: Parse existing JSONL file for incremental merging - fetchAndMergeIssues: Fetch dirty issues from DB and merge into map - filterWisps: Remove ephemeral (wisp) issues from export - filterByMultiRepoPrefix: Multi-repo prefix filtering for non-primary repos - updateFlushExportMetadata: Store hashes and timestamps after export - getIssuesToExport: Determine full vs incremental export issue list Main function now reads as a clear pipeline: 1. Validate integrity -> 2. Get issues -> 3. Read existing JSONL 4. Merge from DB -> 5. Filter wisps -> 6. Filter by prefix 7. Write atomically -> 8. Update metadata Benefits: - Each helper is single-responsibility and testable - Main function reduced from ~280 to ~94 lines - Logic is clearly separated and documented - Easier to understand and maintain 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/steveyegge/beads/internal/beads"
|
"github.com/steveyegge/beads/internal/beads"
|
||||||
"github.com/steveyegge/beads/internal/config"
|
"github.com/steveyegge/beads/internal/config"
|
||||||
"github.com/steveyegge/beads/internal/debug"
|
"github.com/steveyegge/beads/internal/debug"
|
||||||
|
"github.com/steveyegge/beads/internal/storage"
|
||||||
"github.com/steveyegge/beads/internal/types"
|
"github.com/steveyegge/beads/internal/types"
|
||||||
"github.com/steveyegge/beads/internal/ui"
|
"github.com/steveyegge/beads/internal/ui"
|
||||||
"github.com/steveyegge/beads/internal/utils"
|
"github.com/steveyegge/beads/internal/utils"
|
||||||
@@ -462,6 +463,194 @@ func writeJSONLAtomic(jsonlPath string, issues []*types.Issue) ([]string, error)
|
|||||||
return exportedIDs, nil
|
return exportedIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// recordFlushFailure records a flush failure, incrementing the failure counter
|
||||||
|
// and displaying warnings after consecutive failures.
|
||||||
|
func recordFlushFailure(err error) {
|
||||||
|
flushMutex.Lock()
|
||||||
|
flushFailureCount++
|
||||||
|
lastFlushError = err
|
||||||
|
failCount := flushFailureCount
|
||||||
|
flushMutex.Unlock()
|
||||||
|
|
||||||
|
// Always show the immediate warning
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: auto-flush failed: %v\n", err)
|
||||||
|
|
||||||
|
// Show prominent warning after 3+ consecutive failures
|
||||||
|
if failCount >= 3 {
|
||||||
|
fmt.Fprintf(os.Stderr, "\n%s\n", ui.RenderFail("⚠️ CRITICAL: Auto-flush has failed "+fmt.Sprint(failCount)+" times consecutively!"))
|
||||||
|
fmt.Fprintf(os.Stderr, "%s\n", ui.RenderFail("⚠️ Your JSONL file may be out of sync with the database."))
|
||||||
|
fmt.Fprintf(os.Stderr, "%s\n\n", ui.RenderFail("⚠️ Run 'bd export -o .beads/issues.jsonl' manually to fix."))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordFlushSuccess records a successful flush, resetting the failure counter.
|
||||||
|
func recordFlushSuccess() {
|
||||||
|
flushMutex.Lock()
|
||||||
|
flushFailureCount = 0
|
||||||
|
lastFlushError = nil
|
||||||
|
flushMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// readExistingJSONL reads an existing JSONL file into a map for incremental merging.
|
||||||
|
// Returns empty map if file doesn't exist or can't be read.
|
||||||
|
func readExistingJSONL(jsonlPath string) (map[string]*types.Issue, error) {
|
||||||
|
issueMap := make(map[string]*types.Issue)
|
||||||
|
|
||||||
|
existingFile, err := os.Open(jsonlPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return issueMap, nil // File doesn't exist, return empty map
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("failed to open existing JSONL: %w", err)
|
||||||
|
}
|
||||||
|
defer existingFile.Close()
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(existingFile)
|
||||||
|
// Increase buffer to handle large JSON lines
|
||||||
|
// Default scanner limit is 64KB which can cause silent truncation
|
||||||
|
scanner.Buffer(make([]byte, 0, 1024), 2*1024*1024) // 2MB max line size
|
||||||
|
|
||||||
|
lineNum := 0
|
||||||
|
for scanner.Scan() {
|
||||||
|
lineNum++
|
||||||
|
line := scanner.Text()
|
||||||
|
if line == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var issue types.Issue
|
||||||
|
if err := json.Unmarshal([]byte(line), &issue); err == nil {
|
||||||
|
issue.SetDefaults() // Apply defaults for omitted fields (beads-399)
|
||||||
|
issueMap[issue.ID] = &issue
|
||||||
|
} else {
|
||||||
|
// Warn about malformed JSONL lines
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: skipping malformed JSONL line %d: %v\n", lineNum, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := scanner.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read existing JSONL: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return issueMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchAndMergeIssues fetches dirty issues from the database and merges them into issueMap.
|
||||||
|
// Issues that no longer exist are removed from the map.
|
||||||
|
func fetchAndMergeIssues(ctx context.Context, s storage.Storage, dirtyIDs []string, issueMap map[string]*types.Issue) error {
|
||||||
|
for _, issueID := range dirtyIDs {
|
||||||
|
issue, err := s.GetIssue(ctx, issueID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get issue %s: %w", issueID, err)
|
||||||
|
}
|
||||||
|
if issue == nil {
|
||||||
|
// Issue was deleted, remove from map
|
||||||
|
delete(issueMap, issueID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get dependencies for this issue
|
||||||
|
deps, err := s.GetDependencyRecords(ctx, issueID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get dependencies for %s: %w", issueID, err)
|
||||||
|
}
|
||||||
|
issue.Dependencies = deps
|
||||||
|
|
||||||
|
// Update map
|
||||||
|
issueMap[issueID] = issue
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// filterWisps removes ephemeral (wisp) issues from the map and returns a slice.
|
||||||
|
// Wisps should never be exported to JSONL.
|
||||||
|
func filterWisps(issueMap map[string]*types.Issue) []*types.Issue {
|
||||||
|
issues := make([]*types.Issue, 0, len(issueMap))
|
||||||
|
wispsSkipped := 0
|
||||||
|
for _, issue := range issueMap {
|
||||||
|
if issue.Ephemeral {
|
||||||
|
wispsSkipped++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
issues = append(issues, issue)
|
||||||
|
}
|
||||||
|
if wispsSkipped > 0 {
|
||||||
|
debug.Logf("auto-flush: filtered %d wisps from export", wispsSkipped)
|
||||||
|
}
|
||||||
|
return issues
|
||||||
|
}
|
||||||
|
|
||||||
|
// filterByMultiRepoPrefix filters issues by prefix in multi-repo mode.
|
||||||
|
// Non-primary repos should only export issues matching their own prefix.
|
||||||
|
func filterByMultiRepoPrefix(ctx context.Context, s storage.Storage, issues []*types.Issue) []*types.Issue {
|
||||||
|
multiRepo := config.GetMultiRepoConfig()
|
||||||
|
if multiRepo == nil {
|
||||||
|
return issues
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get our configured prefix
|
||||||
|
prefix, prefixErr := s.GetConfig(ctx, "issue_prefix")
|
||||||
|
if prefixErr != nil || prefix == "" {
|
||||||
|
return issues
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine if we're the primary repo
|
||||||
|
cwd, _ := os.Getwd()
|
||||||
|
primaryPath := multiRepo.Primary
|
||||||
|
if primaryPath == "" || primaryPath == "." {
|
||||||
|
primaryPath = cwd
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normalize paths for comparison
|
||||||
|
absCwd, _ := filepath.Abs(cwd)
|
||||||
|
absPrimary, _ := filepath.Abs(primaryPath)
|
||||||
|
|
||||||
|
if absCwd == absPrimary {
|
||||||
|
return issues // Primary repo exports all issues
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter to only issues matching our prefix
|
||||||
|
filtered := make([]*types.Issue, 0, len(issues))
|
||||||
|
prefixWithDash := prefix
|
||||||
|
if !strings.HasSuffix(prefixWithDash, "-") {
|
||||||
|
prefixWithDash = prefix + "-"
|
||||||
|
}
|
||||||
|
for _, issue := range issues {
|
||||||
|
if strings.HasPrefix(issue.ID, prefixWithDash) {
|
||||||
|
filtered = append(filtered, issue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug.Logf("multi-repo filter: %d issues -> %d (prefix %s)", len(issues), len(filtered), prefix)
|
||||||
|
return filtered
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateFlushExportMetadata stores hashes and timestamps after a successful flush export.
|
||||||
|
func updateFlushExportMetadata(ctx context.Context, s storage.Storage, jsonlPath string) {
|
||||||
|
jsonlData, err := os.ReadFile(jsonlPath)
|
||||||
|
if err != nil {
|
||||||
|
return // Non-fatal, just skip metadata update
|
||||||
|
}
|
||||||
|
|
||||||
|
hasher := sha256.New()
|
||||||
|
hasher.Write(jsonlData)
|
||||||
|
exportedHash := hex.EncodeToString(hasher.Sum(nil))
|
||||||
|
|
||||||
|
if err := s.SetMetadata(ctx, "jsonl_content_hash", exportedHash); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_content_hash after export: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store JSONL file hash for integrity validation
|
||||||
|
if err := s.SetJSONLFileHash(ctx, exportedHash); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_file_hash after export: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update last_import_time so staleness check doesn't see JSONL as "newer" (fixes #399)
|
||||||
|
// Use RFC3339Nano to preserve nanosecond precision.
|
||||||
|
exportTime := time.Now().Format(time.RFC3339Nano)
|
||||||
|
if err := s.SetMetadata(ctx, "last_import_time", exportTime); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to update last_import_time after export: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// flushState captures the state needed for a flush operation
|
// flushState captures the state needed for a flush operation
|
||||||
type flushState struct {
|
type flushState struct {
|
||||||
forceDirty bool // Force flush even if isDirty is false
|
forceDirty bool // Force flush even if isDirty is false
|
||||||
@@ -507,30 +696,13 @@ func flushToJSONLWithState(state flushState) {
|
|||||||
storeMutex.Unlock()
|
storeMutex.Unlock()
|
||||||
|
|
||||||
ctx := rootCtx
|
ctx := rootCtx
|
||||||
|
|
||||||
// Validate JSONL integrity BEFORE checking isDirty
|
// Validate JSONL integrity BEFORE checking isDirty
|
||||||
// This detects if JSONL and export_hashes are out of sync (e.g., after git operations)
|
// This detects if JSONL and export_hashes are out of sync (e.g., after git operations)
|
||||||
// If export_hashes was cleared, we need to do a full export even if nothing is dirty
|
|
||||||
integrityNeedsFullExport, err := validateJSONLIntegrity(ctx, jsonlPath)
|
integrityNeedsFullExport, err := validateJSONLIntegrity(ctx, jsonlPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Special case: missing JSONL is not fatal, just forces full export
|
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
// Record failure without clearing isDirty (we didn't do any work yet)
|
recordFlushFailure(err)
|
||||||
flushMutex.Lock()
|
|
||||||
flushFailureCount++
|
|
||||||
lastFlushError = err
|
|
||||||
failCount := flushFailureCount
|
|
||||||
flushMutex.Unlock()
|
|
||||||
|
|
||||||
// Always show the immediate warning
|
|
||||||
fmt.Fprintf(os.Stderr, "Warning: auto-flush failed: %v\n", err)
|
|
||||||
|
|
||||||
// Show prominent warning after 3+ consecutive failures
|
|
||||||
if failCount >= 3 {
|
|
||||||
fmt.Fprintf(os.Stderr, "\n%s\n", ui.RenderFail("⚠️ CRITICAL: Auto-flush has failed "+fmt.Sprint(failCount)+" times consecutively!"))
|
|
||||||
fmt.Fprintf(os.Stderr, "%s\n", ui.RenderFail("⚠️ Your JSONL file may be out of sync with the database."))
|
|
||||||
fmt.Fprintf(os.Stderr, "%s\n\n", ui.RenderFail("⚠️ Run 'bd export -o .beads/issues.jsonl' manually to fix."))
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Missing JSONL: treat as "force full export" case
|
// Missing JSONL: treat as "force full export" case
|
||||||
@@ -538,235 +710,86 @@ func flushToJSONLWithState(state flushState) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check if we should proceed with export
|
// Check if we should proceed with export
|
||||||
// Use only the state parameter - don't read global flags
|
|
||||||
// Caller is responsible for passing correct forceDirty/forceFullExport values
|
|
||||||
if !state.forceDirty && !integrityNeedsFullExport {
|
if !state.forceDirty && !integrityNeedsFullExport {
|
||||||
// Nothing to do: not forced and no integrity issue
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine export mode
|
// Determine export mode
|
||||||
fullExport := state.forceFullExport || integrityNeedsFullExport
|
fullExport := state.forceFullExport || integrityNeedsFullExport
|
||||||
|
|
||||||
// Helper to record failure
|
|
||||||
recordFailure := func(err error) {
|
|
||||||
flushMutex.Lock()
|
|
||||||
flushFailureCount++
|
|
||||||
lastFlushError = err
|
|
||||||
failCount := flushFailureCount
|
|
||||||
flushMutex.Unlock()
|
|
||||||
|
|
||||||
// Always show the immediate warning
|
|
||||||
fmt.Fprintf(os.Stderr, "Warning: auto-flush failed: %v\n", err)
|
|
||||||
|
|
||||||
// Show prominent warning after 3+ consecutive failures
|
|
||||||
if failCount >= 3 {
|
|
||||||
fmt.Fprintf(os.Stderr, "\n%s\n", ui.RenderFail("⚠️ CRITICAL: Auto-flush has failed "+fmt.Sprint(failCount)+" times consecutively!"))
|
|
||||||
fmt.Fprintf(os.Stderr, "%s\n", ui.RenderFail("⚠️ Your JSONL file may be out of sync with the database."))
|
|
||||||
fmt.Fprintf(os.Stderr, "%s\n\n", ui.RenderFail("⚠️ Run 'bd export -o .beads/issues.jsonl' manually to fix."))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper to record success
|
|
||||||
recordSuccess := func() {
|
|
||||||
flushMutex.Lock()
|
|
||||||
flushFailureCount = 0
|
|
||||||
lastFlushError = nil
|
|
||||||
flushMutex.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine which issues to export
|
// Determine which issues to export
|
||||||
var dirtyIDs []string
|
dirtyIDs, err := getIssuesToExport(ctx, fullExport)
|
||||||
|
|
||||||
if fullExport {
|
|
||||||
// Full export: get ALL issues (needed after ID-changing operations like renumber)
|
|
||||||
allIssues, err2 := store.SearchIssues(ctx, "", types.IssueFilter{})
|
|
||||||
if err2 != nil {
|
|
||||||
recordFailure(fmt.Errorf("failed to get all issues: %w", err2))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
dirtyIDs = make([]string, len(allIssues))
|
|
||||||
for i, issue := range allIssues {
|
|
||||||
dirtyIDs[i] = issue.ID
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Incremental export: get only dirty issue IDs
|
|
||||||
var err2 error
|
|
||||||
dirtyIDs, err2 = store.GetDirtyIssues(ctx)
|
|
||||||
if err2 != nil {
|
|
||||||
recordFailure(fmt.Errorf("failed to get dirty issues: %w", err2))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// No dirty issues? Nothing to do!
|
|
||||||
if len(dirtyIDs) == 0 {
|
|
||||||
recordSuccess()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read existing JSONL into a map (skip for full export - we'll rebuild from scratch)
|
|
||||||
issueMap := make(map[string]*types.Issue)
|
|
||||||
if !fullExport {
|
|
||||||
if existingFile, err := os.Open(jsonlPath); err == nil {
|
|
||||||
scanner := bufio.NewScanner(existingFile)
|
|
||||||
// Increase buffer to handle large JSON lines
|
|
||||||
// Default scanner limit is 64KB which can cause silent truncation
|
|
||||||
scanner.Buffer(make([]byte, 0, 1024), 2*1024*1024) // 2MB max line size
|
|
||||||
lineNum := 0
|
|
||||||
for scanner.Scan() {
|
|
||||||
lineNum++
|
|
||||||
line := scanner.Text()
|
|
||||||
if line == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var issue types.Issue
|
|
||||||
if err := json.Unmarshal([]byte(line), &issue); err == nil {
|
|
||||||
issue.SetDefaults() // Apply defaults for omitted fields (beads-399)
|
|
||||||
issueMap[issue.ID] = &issue
|
|
||||||
} else {
|
|
||||||
// Warn about malformed JSONL lines
|
|
||||||
fmt.Fprintf(os.Stderr, "Warning: skipping malformed JSONL line %d: %v\n", lineNum, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Check for scanner errors
|
|
||||||
if err := scanner.Err(); err != nil {
|
|
||||||
_ = existingFile.Close()
|
|
||||||
recordFailure(fmt.Errorf("failed to read existing JSONL: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = existingFile.Close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch only dirty issues from DB
|
|
||||||
for _, issueID := range dirtyIDs {
|
|
||||||
issue, err := store.GetIssue(ctx, issueID)
|
|
||||||
if err != nil {
|
|
||||||
recordFailure(fmt.Errorf("failed to get issue %s: %w", issueID, err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if issue == nil {
|
|
||||||
// Issue was deleted, remove from map
|
|
||||||
delete(issueMap, issueID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get dependencies for this issue
|
|
||||||
deps, err := store.GetDependencyRecords(ctx, issueID)
|
|
||||||
if err != nil {
|
|
||||||
recordFailure(fmt.Errorf("failed to get dependencies for %s: %w", issueID, err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
issue.Dependencies = deps
|
|
||||||
|
|
||||||
// Update map
|
|
||||||
issueMap[issueID] = issue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert map to slice (will be sorted by writeJSONLAtomic)
|
|
||||||
// Filter out wisps - they should never be exported to JSONL
|
|
||||||
// Wisps exist only in SQLite and are shared via .beads/redirect, not JSONL.
|
|
||||||
// This prevents "zombie" issues that resurrect after mol squash deletes them.
|
|
||||||
issues := make([]*types.Issue, 0, len(issueMap))
|
|
||||||
wispsSkipped := 0
|
|
||||||
for _, issue := range issueMap {
|
|
||||||
if issue.Ephemeral {
|
|
||||||
wispsSkipped++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
issues = append(issues, issue)
|
|
||||||
}
|
|
||||||
if wispsSkipped > 0 {
|
|
||||||
debug.Logf("auto-flush: filtered %d wisps from export", wispsSkipped)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter issues by prefix in multi-repo mode for non-primary repos (fixes GH #437)
|
|
||||||
// In multi-repo mode, non-primary repos should only export issues that match
|
|
||||||
// their own prefix. Issues from other repos (hydrated for unified view) should
|
|
||||||
// NOT be written to the local JSONL.
|
|
||||||
multiRepo := config.GetMultiRepoConfig()
|
|
||||||
if multiRepo != nil {
|
|
||||||
// Get our configured prefix
|
|
||||||
prefix, prefixErr := store.GetConfig(ctx, "issue_prefix")
|
|
||||||
if prefixErr == nil && prefix != "" {
|
|
||||||
// Determine if we're the primary repo
|
|
||||||
cwd, _ := os.Getwd()
|
|
||||||
primaryPath := multiRepo.Primary
|
|
||||||
if primaryPath == "" || primaryPath == "." {
|
|
||||||
primaryPath = cwd
|
|
||||||
}
|
|
||||||
|
|
||||||
// Normalize paths for comparison
|
|
||||||
absCwd, _ := filepath.Abs(cwd)
|
|
||||||
absPrimary, _ := filepath.Abs(primaryPath)
|
|
||||||
|
|
||||||
isPrimary := absCwd == absPrimary
|
|
||||||
|
|
||||||
if !isPrimary {
|
|
||||||
// Filter to only issues matching our prefix
|
|
||||||
filtered := make([]*types.Issue, 0, len(issues))
|
|
||||||
prefixWithDash := prefix
|
|
||||||
if !strings.HasSuffix(prefixWithDash, "-") {
|
|
||||||
prefixWithDash = prefix + "-"
|
|
||||||
}
|
|
||||||
for _, issue := range issues {
|
|
||||||
if strings.HasPrefix(issue.ID, prefixWithDash) {
|
|
||||||
filtered = append(filtered, issue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug.Logf("multi-repo filter: %d issues -> %d (prefix %s)", len(issues), len(filtered), prefix)
|
|
||||||
issues = filtered
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write atomically using common helper
|
|
||||||
exportedIDs, err := writeJSONLAtomic(jsonlPath, issues)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
recordFailure(err)
|
recordFlushFailure(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(dirtyIDs) == 0 && !fullExport {
|
||||||
|
recordFlushSuccess()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear only the dirty issues that were actually exported (fixes race condition)
|
// Read existing JSONL into a map (skip for full export - we'll rebuild from scratch)
|
||||||
// Don't clear issues that were skipped due to timestamp-only changes
|
var issueMap map[string]*types.Issue
|
||||||
|
if fullExport {
|
||||||
|
issueMap = make(map[string]*types.Issue)
|
||||||
|
} else {
|
||||||
|
issueMap, err = readExistingJSONL(jsonlPath)
|
||||||
|
if err != nil {
|
||||||
|
recordFlushFailure(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch dirty issues from DB and merge into map
|
||||||
|
if err := fetchAndMergeIssues(ctx, store, dirtyIDs, issueMap); err != nil {
|
||||||
|
recordFlushFailure(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert map to slice, filtering out wisps
|
||||||
|
issues := filterWisps(issueMap)
|
||||||
|
|
||||||
|
// Filter by prefix in multi-repo mode
|
||||||
|
issues = filterByMultiRepoPrefix(ctx, store, issues)
|
||||||
|
|
||||||
|
// Write atomically
|
||||||
|
exportedIDs, err := writeJSONLAtomic(jsonlPath, issues)
|
||||||
|
if err != nil {
|
||||||
|
recordFlushFailure(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear dirty issues that were exported
|
||||||
if len(exportedIDs) > 0 {
|
if len(exportedIDs) > 0 {
|
||||||
if err := store.ClearDirtyIssuesByID(ctx, exportedIDs); err != nil {
|
if err := store.ClearDirtyIssuesByID(ctx, exportedIDs); err != nil {
|
||||||
// Don't fail the whole flush for this, but warn
|
|
||||||
fmt.Fprintf(os.Stderr, "Warning: failed to clear dirty issues: %v\n", err)
|
fmt.Fprintf(os.Stderr, "Warning: failed to clear dirty issues: %v\n", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store hash of exported JSONL (enables hash-based auto-import)
|
// Update metadata (hashes, timestamps)
|
||||||
// Renamed from last_import_hash to jsonl_content_hash
|
updateFlushExportMetadata(ctx, store, jsonlPath)
|
||||||
jsonlData, err := os.ReadFile(jsonlPath)
|
|
||||||
if err == nil {
|
|
||||||
hasher := sha256.New()
|
|
||||||
hasher.Write(jsonlData)
|
|
||||||
exportedHash := hex.EncodeToString(hasher.Sum(nil))
|
|
||||||
if err := store.SetMetadata(ctx, "jsonl_content_hash", exportedHash); err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_content_hash after export: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store JSONL file hash for integrity validation
|
recordFlushSuccess()
|
||||||
if err := store.SetJSONLFileHash(ctx, exportedHash); err != nil {
|
}
|
||||||
fmt.Fprintf(os.Stderr, "Warning: failed to update jsonl_file_hash after export: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update last_import_time so staleness check doesn't see JSONL as "newer" (fixes #399)
|
// getIssuesToExport determines which issue IDs need to be exported.
|
||||||
// CheckStaleness() compares last_import_time against JSONL mtime. After export,
|
// For full export, returns all issue IDs. For incremental, returns only dirty IDs.
|
||||||
// the JSONL mtime is updated, so we must also update last_import_time to prevent
|
func getIssuesToExport(ctx context.Context, fullExport bool) ([]string, error) {
|
||||||
// false "stale" detection on subsequent reads.
|
if fullExport {
|
||||||
//
|
allIssues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||||
// Use RFC3339Nano to preserve nanosecond precision. The file mtime has nanosecond
|
if err != nil {
|
||||||
// precision, so using RFC3339 (second precision) would cause the stored time to be
|
return nil, fmt.Errorf("failed to get all issues: %w", err)
|
||||||
// slightly earlier than the file mtime, triggering false staleness.
|
|
||||||
exportTime := time.Now().Format(time.RFC3339Nano)
|
|
||||||
if err := store.SetMetadata(ctx, "last_import_time", exportTime); err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "Warning: failed to update last_import_time after export: %v\n", err)
|
|
||||||
}
|
}
|
||||||
|
ids := make([]string, len(allIssues))
|
||||||
|
for i, issue := range allIssues {
|
||||||
|
ids[i] = issue.ID
|
||||||
|
}
|
||||||
|
return ids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success! FlushManager manages its local state in run() goroutine.
|
dirtyIDs, err := store.GetDirtyIssues(ctx)
|
||||||
recordSuccess()
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get dirty issues: %w", err)
|
||||||
|
}
|
||||||
|
return dirtyIDs, nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user