feat(sync): pull-first sync with 3-way merge (#918)

* feat(sync): implement pull-first synchronization strategy

- Add --pull-first flag and logic to sync command
- Introduce 3-way merge stub for issue synchronization
- Add concurrent edit tests for the pull-first flow

Ensures local changes are reconciled with remote updates before pushing to prevent data loss.

* feat(sync): implement 3-way merge and state tracking

- Implement 3-way merge algorithm for issue synchronization
- Add base state storage to track changes between syncs
- Add comprehensive tests for merge logic and persistence

Ensures data consistency and prevents data loss during concurrent
issue updates.

* feat(sync): implement field-level conflict merging

- Implement field-level merge logic for issue conflicts
- Add unit tests for field-level merge strategies

Reduces manual intervention by automatically resolving overlapping updates at the field level.

* refactor(sync): simplify sync flow by removing ZFC checks

The previous sync implementation relied on Zero-False-Convergence (ZFC)
staleness checks which are redundant following the transition to
structural 3-way merging. This legacy logic added complexity and
maintenance overhead without providing additional safety.

This commit introduces a streamlined sync pipeline:
- Remove ZFC staleness validation from primary sync flow
- Update safety documentation to reflect current merge strategy
- Eliminate deprecated unit tests associated with ZFC logic

These changes reduce codebase complexity while maintaining data
integrity through the robust structural 3-way merge implementation.

* feat(sync): default to pull-first sync workflow

- Set pull-first as the primary synchronization workflow
- Refactor core sync logic for better maintainability
- Update concurrent edit tests to validate 3-way merge logic

Reduces merge conflicts by ensuring local state is current before pushing changes.

* refactor(sync): clean up lint issues in merge code

- Remove unused error return from MergeIssues (never returned error)
- Use _ prefix for unused _base parameter in mergeFieldLevel
- Update callers to not expect error from MergeIssues
- Keep nolint:gosec for trusted internal file path

* test(sync): add mode compatibility and upgrade safety tests

Add tests addressing Steve's PR #918 review concerns:

- TestSyncBranchModeWithPullFirst: Verifies sync-branch config
  storage and git branch creation work with pull-first
- TestExternalBeadsDirWithPullFirst: Verifies external BEADS_DIR
  detection and pullFromExternalBeadsRepo
- TestUpgradeFromOldSync: Validates upgrade safety when
  sync_base.jsonl doesn't exist (first sync after upgrade)
- TestMergeIssuesWithBaseState: Comprehensive 3-way merge cases
- TestLabelUnionMerge: Verifies labels use union (no data loss)

Key upgrade behavior validated:
- base=nil (no sync_base.jsonl) safely handles all cases
- Local-only issues kept (StrategyLocal)
- Remote-only issues kept (StrategyRemote)
- Overlapping issues merged (LWW scalars, union labels)

* fix(sync): report line numbers for malformed JSON

Problem:
- JSON decoding errors when loading sync base state lacked line numbers
- Difficult to identify location of syntax errors in large state files

Solution:
- Include line number reporting in JSON decoder errors during state loading
- Add regression tests for malformed sync base file scenarios

Impact:
- Users receive actionable feedback for corrupted state files
- Faster troubleshooting of manual configuration errors

* fix(sync): warn on large clock skew during sync

Problem:
- Unsynchronized clocks between systems could lead to silent merge errors
- No mechanism existed to alert users of significant timestamp drift

Solution:
- Implement clock skew detection during sync merge
- Log a warning when large timestamp differences are found
- Add comprehensive unit tests for skew reporting

Impact:
- Users are alerted to potential synchronization risks
- Easier debugging of time-related merge issues

* fix(sync): defer state update until remote push succeeds

Problem:
- Base state updated before confirming remote push completion
- Failed pushes resulted in inconsistent local state tracking

Solution:
- Defer base state update until after the remote push succeeds

Impact:
- Ensures local state accurately reflects remote repository status
- Prevents state desynchronization during network or push failures

* fix(sync): prevent concurrent sync operations

Problem:
- Multiple sync processes could run simultaneously
- Overlapping operations risk data corruption and race conditions

Solution:
- Implement file-based locking using gofrs/flock
- Add integration tests to verify locking behavior

Impact:
- Guarantees execution of a single sync process at a time
- Eliminates potential for data inconsistency during sync

* docs: document sync architecture and merge model

- Detail the 3-way merge model logic
- Describe the core synchronization architecture principles

* fix(lint): explicitly ignore lock.Unlock return value

errcheck linter flagged bare defer lock.Unlock() calls. Wrap in
anonymous function with explicit _ assignment to acknowledge
intentional ignore of unlock errors during cleanup.

* fix(lint): add sync_merge.go to G304 exclusions

The loadBaseState and saveBaseState functions use file paths derived
from trusted internal sources (beadsDir parameter from config). Add
to existing G304 exclusion list for safe JSONL file operations.

* feat(sync): integrate sync-branch into pull-first flow

When sync.branch is configured, doPullFirstSync now:
- Calls PullFromSyncBranch before merge
- Calls CommitToSyncBranch after export

This ensures sync-branch mode uses the correct branch for
pull/push operations.

* test(sync): add E2E tests for sync-branch and external BEADS_DIR

Adds comprehensive end-to-end tests:
- TestSyncBranchE2E: verifies pull→merge→commit flow with remote changes
- TestExternalBeadsDirE2E: verifies sync with separate beads repository
- TestExternalBeadsDirDetection: edge cases for repo detection
- TestCommitToExternalBeadsRepo: commit handling

* refactor(sync): remove unused rollbackJSONLFromGit

Function was defined but never called. Pull-first flow saves base
state after successful push, making this safety net unnecessary.

* test(sync): add export-only mode E2E test

Add TestExportOnlySync to cover --no-pull flag which was the only
untested sync mode. This completes full mode coverage:

- Normal (pull-first): sync_test.go, sync_merge_test.go
- Sync-branch: sync_modes_test.go:TestSyncBranchE2E (PR#918)
- External BEADS_DIR: sync_external_test.go (PR#918)
- From-main: sync_branch_priority_test.go
- Local-only: sync_local_only_test.go
- Export-only: sync_modes_test.go:TestExportOnlySync (this commit)

Refs: #911

* docs(sync): add sync modes reference section

Document all 6 sync modes with triggers, flows, and use cases.
Include mode selection decision tree and test coverage matrix.

Co-authored-by: Claude <noreply@anthropic.com>

* test(sync): upgrade sync-branch E2E tests to bare repo

- Replace mocked repository with real bare repo setup
- Implement multi-machine simulation in sync tests
- Refactor test logic to handle distributed states

Coverage: sync-branch end-to-end scenarios

* test(sync): add daemon sync-branch E2E tests

- Implement E2E tests for daemon sync-branch flow
- Add test cases for force-overwrite scenarios

Coverage: daemon sync-branch workflow in cmd/bd

* docs(sync): document sync-branch paths and E2E architecture

- Describe sync-branch CLI and Daemon execution flow
- Document the end-to-end test architecture

* build(nix): update vendorHash for gofrs/flock dependency

New dependency added for file-based sync locking changes the
Go module checksum.

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Peter Chanthamynavong
2026-01-07 21:27:20 -08:00
committed by GitHub
parent e0b613d5b1
commit 1561374c04
14 changed files with 4188 additions and 925 deletions

View File

@@ -2,8 +2,6 @@ package main
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
@@ -11,6 +9,7 @@ import (
"testing"
"time"
"github.com/gofrs/flock"
"github.com/steveyegge/beads/internal/storage/sqlite"
"github.com/steveyegge/beads/internal/syncbranch"
"github.com/steveyegge/beads/internal/types"
@@ -437,167 +436,8 @@ func TestHasJSONLConflict_MultipleConflicts(t *testing.T) {
}
}
// TestZFCSkipsExportAfterImport tests the bd-l0r fix: after importing JSONL due to
// stale DB detection, sync should skip export to avoid overwriting the JSONL source of truth.
func TestZFCSkipsExportAfterImport(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()
t.Chdir(tmpDir)
// Setup beads directory with JSONL
beadsDir := filepath.Join(tmpDir, ".beads")
os.MkdirAll(beadsDir, 0755)
jsonlPath := filepath.Join(beadsDir, "beads.jsonl")
// Create JSONL with 10 issues (simulating pulled state after cleanup)
var jsonlLines []string
for i := 1; i <= 10; i++ {
line := fmt.Sprintf(`{"id":"bd-%d","title":"JSONL Issue %d","status":"open","issue_type":"task","priority":2,"created_at":"2025-11-24T00:00:00Z","updated_at":"2025-11-24T00:00:00Z"}`, i, i)
jsonlLines = append(jsonlLines, line)
}
os.WriteFile(jsonlPath, []byte(strings.Join(jsonlLines, "\n")+"\n"), 0644)
// Create SQLite store with 100 stale issues (10x the JSONL count = 900% divergence)
testDBPath := filepath.Join(beadsDir, "beads.db")
testStore, err := sqlite.New(ctx, testDBPath)
if err != nil {
t.Fatalf("failed to create test store: %v", err)
}
defer testStore.Close()
// Set issue_prefix to prevent "database not initialized" errors
if err := testStore.SetConfig(ctx, "issue_prefix", "bd"); err != nil {
t.Fatalf("failed to set issue_prefix: %v", err)
}
// Populate DB with 100 issues (stale, 90 closed)
for i := 1; i <= 100; i++ {
status := types.StatusOpen
var closedAt *time.Time
if i > 10 { // First 10 open, rest closed
status = types.StatusClosed
now := time.Now()
closedAt = &now
}
issue := &types.Issue{
Title: fmt.Sprintf("Old Issue %d", i),
Status: status,
ClosedAt: closedAt,
IssueType: types.TypeTask,
Priority: 2,
}
if err := testStore.CreateIssue(ctx, issue, "test-user"); err != nil {
t.Fatalf("failed to create issue %d: %v", i, err)
}
}
// Verify divergence: (100 - 10) / 10 = 900% > 50% threshold
dbCount, _ := countDBIssuesFast(ctx, testStore)
jsonlCount, _ := countIssuesInJSONL(jsonlPath)
divergence := float64(dbCount-jsonlCount) / float64(jsonlCount)
if dbCount != 100 {
t.Fatalf("DB setup failed: expected 100 issues, got %d", dbCount)
}
if jsonlCount != 10 {
t.Fatalf("JSONL setup failed: expected 10 issues, got %d", jsonlCount)
}
if divergence <= 0.5 {
t.Fatalf("Divergence too low: %.2f%% (expected >50%%)", divergence*100)
}
// Set global store for the test
oldStore := store
storeMutex.Lock()
oldStoreActive := storeActive
store = testStore
storeActive = true
storeMutex.Unlock()
defer func() {
storeMutex.Lock()
store = oldStore
storeActive = oldStoreActive
storeMutex.Unlock()
}()
// Save JSONL content hash before running sync logic
beforeHash, _ := computeJSONLHash(jsonlPath)
// Simulate the ZFC check and export step from sync.go lines 126-186
// This is the code path that should detect divergence and skip export
skipExport := false
// ZFC safety check
if err := ensureStoreActive(); err == nil && store != nil {
dbCount, err := countDBIssuesFast(ctx, store)
if err == nil {
jsonlCount, err := countIssuesInJSONL(jsonlPath)
if err == nil && jsonlCount > 0 && dbCount > jsonlCount {
divergence := float64(dbCount-jsonlCount) / float64(jsonlCount)
if divergence > 0.5 {
// Parse JSONL directly (avoid subprocess spawning)
jsonlData, err := os.ReadFile(jsonlPath)
if err != nil {
t.Fatalf("failed to read JSONL: %v", err)
}
// Parse issues from JSONL
var issues []*types.Issue
for _, line := range strings.Split(string(jsonlData), "\n") {
if line == "" {
continue
}
var issue types.Issue
if err := json.Unmarshal([]byte(line), &issue); err != nil {
t.Fatalf("failed to parse JSONL line: %v", err)
}
issue.SetDefaults()
issues = append(issues, &issue)
}
// Import using direct import logic (no subprocess)
opts := ImportOptions{
DryRun: false,
SkipUpdate: false,
}
_, err = importIssuesCore(ctx, testDBPath, testStore, issues, opts)
if err != nil {
t.Fatalf("ZFC import failed: %v", err)
}
skipExport = true
}
}
}
}
// Verify skipExport was set
if !skipExport {
t.Error("Expected skipExport=true after ZFC import, but got false")
}
// Verify DB imported the JSONL issues
// Note: import is additive - it adds/updates but doesn't delete.
// The DB had 100 issues with auto-generated IDs, JSONL has 10 with explicit IDs (bd-1 to bd-10).
// Since there's no overlap, we expect 110 issues total.
afterDBCount, _ := countDBIssuesFast(ctx, testStore)
if afterDBCount != 110 {
t.Errorf("After ZFC import, DB should have 110 issues (100 original + 10 from JSONL), got %d", afterDBCount)
}
// Verify JSONL was NOT modified (no export happened)
afterHash, _ := computeJSONLHash(jsonlPath)
if beforeHash != afterHash {
t.Error("JSONL content changed after ZFC import (export should have been skipped)")
}
// Verify issue count in JSONL is still 10
finalJSONLCount, _ := countIssuesInJSONL(jsonlPath)
if finalJSONLCount != 10 {
t.Errorf("JSONL should still have 10 issues, got %d", finalJSONLCount)
}
t.Logf("✓ ZFC fix verified: divergence detected, import ran, export skipped, JSONL unchanged")
}
// Note: TestZFCSkipsExportAfterImport was removed as ZFC checks are no longer part of the
// legacy sync flow. Use --pull-first for structural staleness handling via 3-way merge.
// TestHashBasedStalenessDetection_bd_f2f tests the bd-f2f fix:
// When JSONL content differs from stored hash (e.g., remote changed status),
@@ -894,3 +734,207 @@ func TestIsExternalBeadsDir(t *testing.T) {
}
})
}
// TestConcurrentEdit tests the pull-first sync flow with concurrent edits.
// This validates the 3-way merge logic for the pull-first sync refactor (#911).
//
// Scenario:
// - Base state exists (issue bd-1 at version 2025-01-01)
// - Local modifies issue (version 2025-01-02)
// - Remote also modifies issue (version 2025-01-03)
// - 3-way merge detects conflict and resolves using LWW (remote wins)
func TestConcurrentEdit(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()
t.Chdir(tmpDir)
// Setup: Initialize git repo
if err := exec.Command("git", "init", "--initial-branch=main").Run(); err != nil {
t.Fatalf("git init failed: %v", err)
}
_ = exec.Command("git", "config", "user.email", "test@test.com").Run()
_ = exec.Command("git", "config", "user.name", "Test User").Run()
// Setup: Create beads directory with JSONL (base state)
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("mkdir failed: %v", err)
}
jsonlPath := filepath.Join(beadsDir, "issues.jsonl")
// Base state: single issue at 2025-01-01
baseTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)
baseIssue := `{"id":"bd-1","title":"Original Title","status":"open","issue_type":"task","priority":2,"created_at":"2025-01-01T00:00:00Z","updated_at":"2025-01-01T00:00:00Z"}`
if err := os.WriteFile(jsonlPath, []byte(baseIssue+"\n"), 0644); err != nil {
t.Fatalf("write JSONL failed: %v", err)
}
// Initial commit
_ = exec.Command("git", "add", ".").Run()
if err := exec.Command("git", "commit", "-m", "initial").Run(); err != nil {
t.Fatalf("initial commit failed: %v", err)
}
// Create database and import base state
testDBPath := filepath.Join(beadsDir, "beads.db")
testStore, err := sqlite.New(ctx, testDBPath)
if err != nil {
t.Fatalf("failed to create test store: %v", err)
}
defer testStore.Close()
// Set issue_prefix
if err := testStore.SetConfig(ctx, "issue_prefix", "bd"); err != nil {
t.Fatalf("failed to set issue_prefix: %v", err)
}
// Load base state for 3-way merge
baseIssues, err := loadIssuesFromJSONL(jsonlPath)
if err != nil {
t.Fatalf("loadIssuesFromJSONL (base) failed: %v", err)
}
// Create local issue (modified at 2025-01-02)
localTime := time.Date(2025, 1, 2, 0, 0, 0, 0, time.UTC)
localIssueObj := &types.Issue{
ID: "bd-1",
Title: "Local Edit",
Status: types.StatusOpen,
IssueType: types.TypeTask,
Priority: 2,
CreatedAt: baseTime,
UpdatedAt: localTime,
}
localIssues := []*types.Issue{localIssueObj}
// Simulate "remote" edit: change title in JSONL (modified at 2025-01-03 - later)
remoteIssue := `{"id":"bd-1","title":"Remote Edit","status":"open","issue_type":"task","priority":2,"created_at":"2025-01-01T00:00:00Z","updated_at":"2025-01-03T00:00:00Z"}`
if err := os.WriteFile(jsonlPath, []byte(remoteIssue+"\n"), 0644); err != nil {
t.Fatalf("write remote JSONL failed: %v", err)
}
remoteIssues, err := loadIssuesFromJSONL(jsonlPath)
if err != nil {
t.Fatalf("loadIssuesFromJSONL (remote) failed: %v", err)
}
if len(remoteIssues) != 1 {
t.Fatalf("expected 1 remote issue, got %d", len(remoteIssues))
}
// 3-way merge with base state
mergeResult := MergeIssues(baseIssues, localIssues, remoteIssues)
// Verify merge result
if len(mergeResult.Merged) != 1 {
t.Fatalf("expected 1 merged issue, got %d", len(mergeResult.Merged))
}
// LWW: Remote wins because it has later updated_at (2025-01-03 > 2025-01-02)
if mergeResult.Merged[0].Title != "Remote Edit" {
t.Errorf("expected merged title 'Remote Edit' (remote wins LWW), got '%s'", mergeResult.Merged[0].Title)
}
// Verify strategy: should be "merged" (conflict resolved by LWW)
if mergeResult.Strategy["bd-1"] != StrategyMerged {
t.Errorf("expected strategy '%s' for bd-1, got '%s'", StrategyMerged, mergeResult.Strategy["bd-1"])
}
// Verify 1 conflict was detected and resolved
if mergeResult.Conflicts != 1 {
t.Errorf("expected 1 conflict (both sides modified), got %d", mergeResult.Conflicts)
}
t.Log("TestConcurrentEdit: 3-way merge with LWW resolution validated")
}
// TestConcurrentSyncBlocked tests that concurrent syncs are blocked by file lock.
// This validates the P0 fix for preventing data corruption when running bd sync
// from multiple terminals simultaneously.
func TestConcurrentSyncBlocked(t *testing.T) {
ctx := context.Background()
tmpDir := t.TempDir()
t.Chdir(tmpDir)
// Setup: Initialize git repo
if err := exec.Command("git", "init", "--initial-branch=main").Run(); err != nil {
t.Fatalf("git init failed: %v", err)
}
_ = exec.Command("git", "config", "user.email", "test@test.com").Run()
_ = exec.Command("git", "config", "user.name", "Test User").Run()
// Setup: Create beads directory with JSONL
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("mkdir failed: %v", err)
}
jsonlPath := filepath.Join(beadsDir, "issues.jsonl")
// Create initial JSONL
if err := os.WriteFile(jsonlPath, []byte(`{"id":"bd-1","title":"Test"}`+"\n"), 0644); err != nil {
t.Fatalf("write JSONL failed: %v", err)
}
// Initial commit
_ = exec.Command("git", "add", ".").Run()
if err := exec.Command("git", "commit", "-m", "initial").Run(); err != nil {
t.Fatalf("initial commit failed: %v", err)
}
// Create database
testDBPath := filepath.Join(beadsDir, "beads.db")
testStore, err := sqlite.New(ctx, testDBPath)
if err != nil {
t.Fatalf("failed to create test store: %v", err)
}
defer testStore.Close()
// Set issue_prefix
if err := testStore.SetConfig(ctx, "issue_prefix", "bd"); err != nil {
t.Fatalf("failed to set issue_prefix: %v", err)
}
// Simulate another sync holding the lock
lockPath := filepath.Join(beadsDir, ".sync.lock")
lock := flock.New(lockPath)
locked, err := lock.TryLock()
if err != nil {
t.Fatalf("failed to acquire lock: %v", err)
}
if !locked {
t.Fatal("expected to acquire lock")
}
// Now try to acquire the same lock (simulating concurrent sync)
lock2 := flock.New(lockPath)
locked2, err := lock2.TryLock()
if err != nil {
t.Fatalf("TryLock error: %v", err)
}
// Second lock attempt should fail (not block)
if locked2 {
lock2.Unlock()
t.Error("expected second lock attempt to fail (concurrent sync should be blocked)")
} else {
t.Log("Concurrent sync correctly blocked by file lock")
}
// Release first lock
if err := lock.Unlock(); err != nil {
t.Fatalf("failed to unlock: %v", err)
}
// Now lock should be acquirable again
lock3 := flock.New(lockPath)
locked3, err := lock3.TryLock()
if err != nil {
t.Fatalf("TryLock error after unlock: %v", err)
}
if !locked3 {
t.Error("expected lock to be acquirable after first sync completes")
} else {
lock3.Unlock()
t.Log("Lock correctly acquirable after first sync completes")
}
}