Fix multi-round convergence for N-way collisions (bd-108)
- Add AllocateNextID() public method to SQLiteStorage for cross-package ID allocation - Enhance handleRename() to handle collision during rename with retry logic - Fix stale ID map issue by removing deleted IDs from dbByID after rename - Update edge case tests to use convergence rounds consistently - All N-way collision tests now pass (TestFiveCloneCollision, TestEdgeCases)
This commit is contained in:
@@ -402,9 +402,13 @@ func testIdenticalContent(t *testing.T, numClones int) {
|
||||
syncCloneWithConflictResolution(t, cloneDirs[syncOrder[i]], syncOrder[i], i == 0)
|
||||
}
|
||||
|
||||
// Final pull
|
||||
for name, dir := range cloneDirs {
|
||||
finalPullForClone(t, dir, name)
|
||||
// Final convergence rounds
|
||||
for round := 1; round <= 3; round++ {
|
||||
for i := 0; i < numClones; i++ {
|
||||
name := string(rune('A' + i))
|
||||
dir := cloneDirs[name]
|
||||
syncCloneWithConflictResolution(t, dir, name, false)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all clones have exactly one issue (deduplication worked)
|
||||
@@ -450,9 +454,13 @@ func testOneDifferent(t *testing.T, numClones int) {
|
||||
syncCloneWithConflictResolution(t, cloneDirs[syncOrder[i]], syncOrder[i], i == 0)
|
||||
}
|
||||
|
||||
// Final pull
|
||||
for name, dir := range cloneDirs {
|
||||
finalPullForClone(t, dir, name)
|
||||
// Final convergence rounds
|
||||
for round := 1; round <= 3; round++ {
|
||||
for i := 0; i < numClones; i++ {
|
||||
name := string(rune('A' + i))
|
||||
dir := cloneDirs[name]
|
||||
syncCloneWithConflictResolution(t, dir, name, false)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all clones have exactly 2 issues
|
||||
@@ -503,9 +511,15 @@ func testMixedCollisions(t *testing.T, numClones int) {
|
||||
syncCloneWithConflictResolution(t, cloneDirs[syncOrder[i]], syncOrder[i], i == 0)
|
||||
}
|
||||
|
||||
// Final pull
|
||||
for name, dir := range cloneDirs {
|
||||
finalPullForClone(t, dir, name)
|
||||
// Final convergence rounds - same as TestFiveCloneCollision
|
||||
t.Log("Final convergence rounds")
|
||||
for round := 1; round <= 3; round++ {
|
||||
t.Logf("Convergence round %d", round)
|
||||
for i := 0; i < numClones; i++ {
|
||||
name := string(rune('A' + i))
|
||||
dir := cloneDirs[name]
|
||||
syncCloneWithConflictResolution(t, dir, name, false)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify all clones have all 2*N issues
|
||||
|
||||
@@ -284,7 +284,8 @@ func buildIDMap(issues []*types.Issue) map[string]*types.Issue {
|
||||
}
|
||||
|
||||
// handleRename handles content match with different IDs (rename detected)
|
||||
func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.Issue, incoming *types.Issue) error {
|
||||
// Returns the old ID that was deleted (if any), or empty string if no deletion occurred
|
||||
func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.Issue, incoming *types.Issue) (string, error) {
|
||||
// Check if target ID already exists with the same content (race condition)
|
||||
// This can happen when multiple clones import the same rename simultaneously
|
||||
targetIssue, err := s.GetIssue(ctx, incoming.ID)
|
||||
@@ -292,17 +293,77 @@ func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.
|
||||
// Target ID exists - check if it has the same content
|
||||
if targetIssue.ComputeContentHash() == incoming.ComputeContentHash() {
|
||||
// Same content - check if old ID still exists and delete it
|
||||
deletedID := ""
|
||||
existingCheck, checkErr := s.GetIssue(ctx, existing.ID)
|
||||
if checkErr == nil && existingCheck != nil {
|
||||
if err := s.DeleteIssue(ctx, existing.ID); err != nil {
|
||||
return fmt.Errorf("failed to delete old ID %s: %w", existing.ID, err)
|
||||
return "", fmt.Errorf("failed to delete old ID %s: %w", existing.ID, err)
|
||||
}
|
||||
deletedID = existing.ID
|
||||
}
|
||||
// The rename is already complete in the database
|
||||
return nil
|
||||
return deletedID, nil
|
||||
}
|
||||
// Different content - this is an unexpected collision
|
||||
return fmt.Errorf("target ID %s already exists with different content", incoming.ID)
|
||||
// Different content - this is a collision during rename
|
||||
// Allocate a new ID for the incoming issue instead of using the desired ID
|
||||
prefix, err := s.GetConfig(ctx, "issue_prefix")
|
||||
if err != nil || prefix == "" {
|
||||
prefix = "bd"
|
||||
}
|
||||
|
||||
oldID := existing.ID
|
||||
|
||||
// Retry up to 3 times to handle concurrent ID allocation
|
||||
const maxRetries = 3
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
// Sync counters before allocation to avoid collisions
|
||||
if attempt > 0 {
|
||||
if syncErr := s.SyncAllCounters(ctx); syncErr != nil {
|
||||
return "", fmt.Errorf("failed to sync counters on retry %d: %w", attempt, syncErr)
|
||||
}
|
||||
}
|
||||
|
||||
newID, err := s.AllocateNextID(ctx, prefix)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to generate new ID for rename collision: %w", err)
|
||||
}
|
||||
|
||||
// Update incoming issue to use the new ID
|
||||
incoming.ID = newID
|
||||
|
||||
// Delete old ID (only on first attempt)
|
||||
if attempt == 0 {
|
||||
if err := s.DeleteIssue(ctx, oldID); err != nil {
|
||||
return "", fmt.Errorf("failed to delete old ID %s: %w", oldID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create with new ID
|
||||
err = s.CreateIssue(ctx, incoming, "import-rename-collision")
|
||||
if err == nil {
|
||||
// Success!
|
||||
return oldID, nil
|
||||
}
|
||||
|
||||
// Check if it's a UNIQUE constraint error
|
||||
if !sqlite.IsUniqueConstraintError(err) {
|
||||
// Not a UNIQUE constraint error, fail immediately
|
||||
return "", fmt.Errorf("failed to create renamed issue with collision resolution %s: %w", newID, err)
|
||||
}
|
||||
|
||||
// UNIQUE constraint error - retry with new ID
|
||||
if attempt == maxRetries-1 {
|
||||
// Last attempt failed
|
||||
return "", fmt.Errorf("failed to create renamed issue with collision resolution after %d retries: %w", maxRetries, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Note: We don't update text references here because it would be too expensive
|
||||
// to scan all issues during every import. Text references to the old ID will
|
||||
// eventually be cleaned up by manual reference updates or remain as stale.
|
||||
// This is acceptable because the old ID no longer exists in the system.
|
||||
|
||||
return oldID, nil
|
||||
}
|
||||
|
||||
// Check if old ID still exists (it might have been deleted by another clone)
|
||||
@@ -312,14 +373,15 @@ func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.
|
||||
// Verify that target exists with correct content
|
||||
targetCheck, targetErr := s.GetIssue(ctx, incoming.ID)
|
||||
if targetErr == nil && targetCheck != nil && targetCheck.ComputeContentHash() == incoming.ComputeContentHash() {
|
||||
return nil
|
||||
return "", nil
|
||||
}
|
||||
return fmt.Errorf("old ID %s doesn't exist and target ID %s is not as expected", existing.ID, incoming.ID)
|
||||
return "", fmt.Errorf("old ID %s doesn't exist and target ID %s is not as expected", existing.ID, incoming.ID)
|
||||
}
|
||||
|
||||
// Delete old ID
|
||||
if err := s.DeleteIssue(ctx, existing.ID); err != nil {
|
||||
return fmt.Errorf("failed to delete old ID %s: %w", existing.ID, err)
|
||||
oldID := existing.ID
|
||||
if err := s.DeleteIssue(ctx, oldID); err != nil {
|
||||
return "", fmt.Errorf("failed to delete old ID %s: %w", oldID, err)
|
||||
}
|
||||
|
||||
// Create with new ID
|
||||
@@ -330,23 +392,23 @@ func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.
|
||||
targetIssue, getErr := s.GetIssue(ctx, incoming.ID)
|
||||
if getErr == nil && targetIssue != nil && targetIssue.ComputeContentHash() == incoming.ComputeContentHash() {
|
||||
// Same content - rename already complete, this is OK
|
||||
return nil
|
||||
return oldID, nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("failed to create renamed issue %s: %w", incoming.ID, err)
|
||||
return "", fmt.Errorf("failed to create renamed issue %s: %w", incoming.ID, err)
|
||||
}
|
||||
|
||||
// Update references from old ID to new ID
|
||||
idMapping := map[string]string{existing.ID: incoming.ID}
|
||||
cache, err := sqlite.BuildReplacementCache(idMapping)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to build replacement cache: %w", err)
|
||||
return "", fmt.Errorf("failed to build replacement cache: %w", err)
|
||||
}
|
||||
|
||||
// Get all issues to update references
|
||||
dbIssues, err := s.SearchIssues(ctx, "", types.IssueFilter{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get issues for reference update: %w", err)
|
||||
return "", fmt.Errorf("failed to get issues for reference update: %w", err)
|
||||
}
|
||||
|
||||
// Update text field references in all issues
|
||||
@@ -375,12 +437,12 @@ func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.
|
||||
|
||||
if len(updates) > 0 {
|
||||
if err := s.UpdateIssue(ctx, issue.ID, updates, "import-rename"); err != nil {
|
||||
return fmt.Errorf("failed to update references in issue %s: %w", issue.ID, err)
|
||||
return "", fmt.Errorf("failed to update references in issue %s: %w", issue.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return oldID, nil
|
||||
}
|
||||
|
||||
// upsertIssues creates new issues or updates existing ones using content-first matching
|
||||
@@ -422,9 +484,14 @@ func upsertIssues(ctx context.Context, sqliteStore *sqlite.SQLiteStorage, issues
|
||||
} else {
|
||||
// Same content, different ID - rename detected
|
||||
if !opts.SkipUpdate {
|
||||
if err := handleRename(ctx, sqliteStore, existing, incoming); err != nil {
|
||||
deletedID, err := handleRename(ctx, sqliteStore, existing, incoming)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to handle rename %s -> %s: %w", existing.ID, incoming.ID, err)
|
||||
}
|
||||
// Remove the deleted ID from the map to prevent stale references
|
||||
if deletedID != "" {
|
||||
delete(dbByID, deletedID)
|
||||
}
|
||||
result.Updated++
|
||||
} else {
|
||||
result.Skipped++
|
||||
|
||||
@@ -636,6 +636,16 @@ func (s *SQLiteStorage) getNextIDForPrefix(ctx context.Context, prefix string) (
|
||||
return nextID, nil
|
||||
}
|
||||
|
||||
// AllocateNextID generates the next issue ID for a given prefix.
|
||||
// This is a public wrapper around getNextIDForPrefix for use by other packages.
|
||||
func (s *SQLiteStorage) AllocateNextID(ctx context.Context, prefix string) (string, error) {
|
||||
nextID, err := s.getNextIDForPrefix(ctx, prefix)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return fmt.Sprintf("%s-%d", prefix, nextID), nil
|
||||
}
|
||||
|
||||
// SyncAllCounters synchronizes all ID counters based on existing issues in the database
|
||||
// This scans all issues and updates counters to prevent ID collisions with auto-generated IDs
|
||||
// Note: This unconditionally overwrites counter values, allowing them to decrease after deletions
|
||||
|
||||
Reference in New Issue
Block a user