feat(sync): add incremental export for large repos

For repos with 1000+ issues where less than 20% are dirty,
incremental export reads the existing JSONL, merges only
changed issues, and writes back - avoiding full re-export.

- Add exportToJSONLIncrementalDeferred as new default export path
- Add shouldUseIncrementalExport to check thresholds
- Add performIncrementalExport for merge-based export
- Add readJSONLToMap helper for fast JSONL parsing
- Falls back to full export when incremental is not beneficial

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
beads/crew/lydia
2026-01-20 21:26:08 -08:00
committed by Steve Yegge
parent d3d2326a8b
commit 63c2e50158
3 changed files with 740 additions and 2 deletions

View File

@@ -1,6 +1,7 @@
package main
import (
"bufio"
"cmp"
"context"
"encoding/json"
@@ -18,6 +19,15 @@ import (
"github.com/steveyegge/beads/internal/validation"
)
// Incremental export thresholds
const (
// incrementalThreshold is the minimum total issue count to consider incremental export
incrementalThreshold = 1000
// incrementalDirtyRatio is the max ratio of dirty/total issues for incremental export
// If more than 20% of issues are dirty, full export is likely faster
incrementalDirtyRatio = 0.20
)
// ExportResult contains information needed to finalize an export after git commit.
// This enables atomic sync by deferring metadata updates until after git commit succeeds.
// See GH#885 for the atomicity gap this fixes.
@@ -259,6 +269,295 @@ func exportToJSONLDeferred(ctx context.Context, jsonlPath string) (*ExportResult
}, nil
}
// exportToJSONLIncrementalDeferred performs incremental export for large repos.
// It checks if incremental export would be beneficial (large repo, few dirty issues),
// and if so, reads the existing JSONL, updates only dirty issues, and writes back.
// Falls back to full export when incremental is not beneficial.
//
// Returns the export result for deferred finalization (same as exportToJSONLDeferred).
func exportToJSONLIncrementalDeferred(ctx context.Context, jsonlPath string) (*ExportResult, error) {
// If daemon is running, delegate to it (daemon has its own optimization)
if daemonClient != nil {
return exportToJSONLDeferred(ctx, jsonlPath)
}
// Ensure store is initialized
if err := ensureStoreActive(); err != nil {
return nil, fmt.Errorf("failed to initialize store: %w", err)
}
// Check if incremental export would be beneficial
useIncremental, dirtyIDs, err := shouldUseIncrementalExport(ctx, jsonlPath)
if err != nil {
// On error checking, fall back to full export
return exportToJSONLDeferred(ctx, jsonlPath)
}
if !useIncremental {
return exportToJSONLDeferred(ctx, jsonlPath)
}
// No dirty issues means nothing to export
if len(dirtyIDs) == 0 {
// Still need to return a valid result for idempotency
contentHash, _ := computeJSONLHash(jsonlPath)
return &ExportResult{
JSONLPath: jsonlPath,
ExportedIDs: []string{},
ContentHash: contentHash,
ExportTime: time.Now().Format(time.RFC3339Nano),
}, nil
}
// Perform incremental export
return performIncrementalExport(ctx, jsonlPath, dirtyIDs)
}
// shouldUseIncrementalExport determines if incremental export would be beneficial.
// Returns (useIncremental, dirtyIDs, error).
func shouldUseIncrementalExport(ctx context.Context, jsonlPath string) (bool, []string, error) {
// Check if JSONL file exists (can't do incremental without existing file)
if _, err := os.Stat(jsonlPath); os.IsNotExist(err) {
return false, nil, nil
}
// Get dirty issue IDs
dirtyIDs, err := store.GetDirtyIssues(ctx)
if err != nil {
return false, nil, fmt.Errorf("failed to get dirty issues: %w", err)
}
// If no dirty issues, we can skip export entirely
if len(dirtyIDs) == 0 {
return true, dirtyIDs, nil
}
// Get total issue count from existing JSONL (fast line count)
totalCount, err := countIssuesInJSONL(jsonlPath)
if err != nil {
// Can't read JSONL, fall back to full export
return false, nil, nil
}
// Check thresholds:
// 1. Total must be above threshold (small repos are fast enough with full export)
// 2. Dirty ratio must be below threshold (if most issues changed, full export is faster)
if totalCount < incrementalThreshold {
return false, nil, nil
}
dirtyRatio := float64(len(dirtyIDs)) / float64(totalCount)
if dirtyRatio > incrementalDirtyRatio {
return false, nil, nil
}
return true, dirtyIDs, nil
}
// performIncrementalExport performs the actual incremental export.
// It reads the existing JSONL, queries only dirty issues, merges them,
// and writes the result.
func performIncrementalExport(ctx context.Context, jsonlPath string, dirtyIDs []string) (*ExportResult, error) {
// Read existing JSONL into map[id]rawJSON
issueMap, allIDs, err := readJSONLToMap(jsonlPath)
if err != nil {
// Fall back to full export on read error
return exportToJSONLDeferred(ctx, jsonlPath)
}
// Query dirty issues from database and track which IDs were found
dirtyIssues := make([]*types.Issue, 0, len(dirtyIDs))
issueByID := make(map[string]*types.Issue, len(dirtyIDs))
for _, id := range dirtyIDs {
issue, err := store.GetIssue(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to get dirty issue %s: %w", id, err)
}
issueByID[id] = issue // Store result (may be nil for deleted issues)
if issue != nil {
dirtyIssues = append(dirtyIssues, issue)
}
}
// Get dependencies for dirty issues only
// Note: GetAllDependencyRecords is used because there's no batch method for specific IDs,
// but for truly large repos this could be optimized with a targeted query
allDeps, err := store.GetAllDependencyRecords(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get dependencies: %w", err)
}
for _, issue := range dirtyIssues {
issue.Dependencies = allDeps[issue.ID]
}
// Get labels for dirty issues (batch query)
labelsMap, err := store.GetLabelsForIssues(ctx, dirtyIDs)
if err != nil {
return nil, fmt.Errorf("failed to get labels: %w", err)
}
for _, issue := range dirtyIssues {
issue.Labels = labelsMap[issue.ID]
}
// Get comments for dirty issues (batch query)
commentsMap, err := store.GetCommentsForIssues(ctx, dirtyIDs)
if err != nil {
return nil, fmt.Errorf("failed to get comments: %w", err)
}
for _, issue := range dirtyIssues {
issue.Comments = commentsMap[issue.ID]
}
// Update map with dirty issues
idSet := make(map[string]bool, len(allIDs))
for _, id := range allIDs {
idSet[id] = true
}
for _, issue := range dirtyIssues {
// Skip wisps - they should never be exported
if issue.Ephemeral {
continue
}
// Serialize issue to JSON
data, err := json.Marshal(issue)
if err != nil {
return nil, fmt.Errorf("failed to marshal issue %s: %w", issue.ID, err)
}
issueMap[issue.ID] = data
if !idSet[issue.ID] {
allIDs = append(allIDs, issue.ID)
idSet[issue.ID] = true
}
}
// Handle tombstones and deletions using cached results (no second GetIssue call)
for _, id := range dirtyIDs {
issue := issueByID[id] // Use cached result
if issue == nil {
// Issue was fully deleted (not even a tombstone)
delete(issueMap, id)
} else if issue.Status == types.StatusTombstone {
// Issue is a tombstone - keep it in export for propagation
if !issue.Ephemeral {
data, err := json.Marshal(issue)
if err != nil {
return nil, fmt.Errorf("failed to marshal tombstone %s: %w", id, err)
}
issueMap[id] = data
}
}
}
// Build sorted list of IDs (excluding deleted ones)
finalIDs := make([]string, 0, len(issueMap))
for id := range issueMap {
finalIDs = append(finalIDs, id)
}
slices.Sort(finalIDs)
// Write to temp file, then atomic rename
dir := filepath.Dir(jsonlPath)
base := filepath.Base(jsonlPath)
tempFile, err := os.CreateTemp(dir, base+".tmp.*")
if err != nil {
return nil, fmt.Errorf("failed to create temp file: %w", err)
}
tempPath := tempFile.Name()
defer func() {
_ = tempFile.Close()
_ = os.Remove(tempPath)
}()
// Write JSONL in sorted order
exportedIDs := make([]string, 0, len(finalIDs))
for _, id := range finalIDs {
data := issueMap[id]
if _, err := tempFile.Write(data); err != nil {
return nil, fmt.Errorf("failed to write issue %s: %w", id, err)
}
if _, err := tempFile.WriteString("\n"); err != nil {
return nil, fmt.Errorf("failed to write newline: %w", err)
}
exportedIDs = append(exportedIDs, id)
}
// Close and rename
_ = tempFile.Close()
if err := os.Rename(tempPath, jsonlPath); err != nil {
return nil, fmt.Errorf("failed to replace JSONL file: %w", err)
}
// Set permissions
if err := os.Chmod(jsonlPath, 0600); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to set file permissions: %v\n", err)
}
// Compute hash
contentHash, _ := computeJSONLHash(jsonlPath)
exportTime := time.Now().Format(time.RFC3339Nano)
// Note: exportedIDs contains ALL IDs in the file, but we only need to clear
// dirty flags for the dirtyIDs (which we received as parameter)
return &ExportResult{
JSONLPath: jsonlPath,
ExportedIDs: dirtyIDs, // Only clear dirty flags for actually dirty issues
ContentHash: contentHash,
ExportTime: exportTime,
}, nil
}
// readJSONLToMap reads a JSONL file into a map of id -> raw JSON bytes.
// Also returns the list of IDs in original order.
func readJSONLToMap(jsonlPath string) (map[string]json.RawMessage, []string, error) {
// #nosec G304 - controlled path
file, err := os.Open(jsonlPath)
if err != nil {
return nil, nil, err
}
defer func() { _ = file.Close() }()
issueMap := make(map[string]json.RawMessage)
var ids []string
scanner := bufio.NewScanner(file)
// Use larger buffer for large lines
scanner.Buffer(make([]byte, 0, 64*1024), 2*1024*1024)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
// Extract ID from JSON without full unmarshal
var partial struct {
ID string `json:"id"`
}
if err := json.Unmarshal(line, &partial); err != nil {
// Skip malformed lines
continue
}
if partial.ID == "" {
continue
}
// Store a copy of the line (scanner reuses buffer)
lineCopy := make([]byte, len(line))
copy(lineCopy, line)
issueMap[partial.ID] = json.RawMessage(lineCopy)
ids = append(ids, partial.ID)
}
if err := scanner.Err(); err != nil {
return nil, nil, err
}
return issueMap, ids, nil
}
// validateOpenIssuesForSync validates all open issues against their templates
// before export, based on the validation.on-sync config setting.
// Returns an error if validation.on-sync is "error" and issues fail validation.