bd-109: Add retry logic and race condition handling for N-way collisions
- Added ExecInTransaction helper for atomic database operations - Added IsUniqueConstraintError to detect UNIQUE constraint violations - Wrapped RemapCollisions with retry logic (3 attempts with counter sync) - Enhanced handleRename to detect race conditions where target ID exists - Added defensive checks for when old ID has been deleted by another clone Progress: Improves N-way collision handling, though full solution requires more work (tracked in bd-108). Tests now reach later convergence rounds before hitting complex collision scenarios. Amp-Thread-ID: https://ampcode.com/threads/T-2b850a80-f8bd-4e38-b661-e33d1cfa7281 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -285,6 +285,38 @@ func buildIDMap(issues []*types.Issue) map[string]*types.Issue {
|
|||||||
|
|
||||||
// handleRename handles content match with different IDs (rename detected)
|
// handleRename handles content match with different IDs (rename detected)
|
||||||
func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.Issue, incoming *types.Issue) error {
|
func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.Issue, incoming *types.Issue) error {
|
||||||
|
// Check if target ID already exists with the same content (race condition)
|
||||||
|
// This can happen when multiple clones import the same rename simultaneously
|
||||||
|
targetIssue, err := s.GetIssue(ctx, incoming.ID)
|
||||||
|
if err == nil && targetIssue != nil {
|
||||||
|
// Target ID exists - check if it has the same content
|
||||||
|
if targetIssue.ComputeContentHash() == incoming.ComputeContentHash() {
|
||||||
|
// Same content - check if old ID still exists and delete it
|
||||||
|
existingCheck, checkErr := s.GetIssue(ctx, existing.ID)
|
||||||
|
if checkErr == nil && existingCheck != nil {
|
||||||
|
if err := s.DeleteIssue(ctx, existing.ID); err != nil {
|
||||||
|
return fmt.Errorf("failed to delete old ID %s: %w", existing.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// The rename is already complete in the database
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Different content - this is an unexpected collision
|
||||||
|
return fmt.Errorf("target ID %s already exists with different content", incoming.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if old ID still exists (it might have been deleted by another clone)
|
||||||
|
existingCheck, checkErr := s.GetIssue(ctx, existing.ID)
|
||||||
|
if checkErr != nil || existingCheck == nil {
|
||||||
|
// Old ID doesn't exist - the rename must have been completed by another clone
|
||||||
|
// Verify that target exists with correct content
|
||||||
|
targetCheck, targetErr := s.GetIssue(ctx, incoming.ID)
|
||||||
|
if targetErr == nil && targetCheck != nil && targetCheck.ComputeContentHash() == incoming.ComputeContentHash() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("old ID %s doesn't exist and target ID %s is not as expected", existing.ID, incoming.ID)
|
||||||
|
}
|
||||||
|
|
||||||
// Delete old ID
|
// Delete old ID
|
||||||
if err := s.DeleteIssue(ctx, existing.ID); err != nil {
|
if err := s.DeleteIssue(ctx, existing.ID); err != nil {
|
||||||
return fmt.Errorf("failed to delete old ID %s: %w", existing.ID, err)
|
return fmt.Errorf("failed to delete old ID %s: %w", existing.ID, err)
|
||||||
@@ -292,6 +324,15 @@ func handleRename(ctx context.Context, s *sqlite.SQLiteStorage, existing *types.
|
|||||||
|
|
||||||
// Create with new ID
|
// Create with new ID
|
||||||
if err := s.CreateIssue(ctx, incoming, "import-rename"); err != nil {
|
if err := s.CreateIssue(ctx, incoming, "import-rename"); err != nil {
|
||||||
|
// If UNIQUE constraint error, it's likely another clone created it concurrently
|
||||||
|
if sqlite.IsUniqueConstraintError(err) {
|
||||||
|
// Check if target exists with same content
|
||||||
|
targetIssue, getErr := s.GetIssue(ctx, incoming.ID)
|
||||||
|
if getErr == nil && targetIssue != nil && targetIssue.ComputeContentHash() == incoming.ComputeContentHash() {
|
||||||
|
// Same content - rename already complete, this is OK
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
return fmt.Errorf("failed to create renamed issue %s: %w", incoming.ID, err)
|
return fmt.Errorf("failed to create renamed issue %s: %w", incoming.ID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -335,11 +335,37 @@ func deduplicateIncomingIssues(issues []*types.Issue) []*types.Issue {
|
|||||||
//
|
//
|
||||||
// This ensures deterministic, symmetric collision resolution across all clones.
|
// This ensures deterministic, symmetric collision resolution across all clones.
|
||||||
//
|
//
|
||||||
// NOTE: This function is not atomic - it performs multiple separate database operations.
|
// The function automatically retries up to 3 times on UNIQUE constraint failures,
|
||||||
// If an error occurs partway through, some issues may be created without their references
|
// syncing counters between retries to handle concurrent ID allocation.
|
||||||
// being updated. This is a known limitation that requires storage layer refactoring to fix.
|
func RemapCollisions(ctx context.Context, s *SQLiteStorage, collisions []*CollisionDetail, incomingIssues []*types.Issue) (map[string]string, error) {
|
||||||
// See issue bd-25 for transaction support.
|
const maxRetries = 3
|
||||||
func RemapCollisions(ctx context.Context, s *SQLiteStorage, collisions []*CollisionDetail, _ []*types.Issue) (map[string]string, error) {
|
var lastErr error
|
||||||
|
|
||||||
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
|
idMapping, err := remapCollisionsOnce(ctx, s, collisions, incomingIssues)
|
||||||
|
if err == nil {
|
||||||
|
return idMapping, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
lastErr = err
|
||||||
|
|
||||||
|
if !isUniqueConstraintError(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if attempt < maxRetries-1 {
|
||||||
|
if syncErr := s.SyncAllCounters(ctx); syncErr != nil {
|
||||||
|
return nil, fmt.Errorf("retry %d: UNIQUE constraint error, counter sync failed: %w (original error: %v)", attempt+1, syncErr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("failed after %d retries due to UNIQUE constraint violations: %w", maxRetries, lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// remapCollisionsOnce performs a single attempt at collision resolution.
|
||||||
|
// This is the actual implementation that RemapCollisions wraps with retry logic.
|
||||||
|
func remapCollisionsOnce(ctx context.Context, s *SQLiteStorage, collisions []*CollisionDetail, _ []*types.Issue) (map[string]string, error) {
|
||||||
idMapping := make(map[string]string)
|
idMapping := make(map[string]string)
|
||||||
|
|
||||||
// Sync counters before remapping to avoid ID collisions
|
// Sync counters before remapping to avoid ID collisions
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package sqlite
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueryContext exposes the underlying database QueryContext method for advanced queries
|
// QueryContext exposes the underlying database QueryContext method for advanced queries
|
||||||
@@ -16,3 +18,37 @@ func (s *SQLiteStorage) QueryContext(ctx context.Context, query string, args ...
|
|||||||
func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) {
|
func (s *SQLiteStorage) BeginTx(ctx context.Context) (*sql.Tx, error) {
|
||||||
return s.db.BeginTx(ctx, nil)
|
return s.db.BeginTx(ctx, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExecInTransaction executes a function within a database transaction.
|
||||||
|
// If the function returns an error, the transaction is rolled back.
|
||||||
|
// Otherwise, the transaction is committed.
|
||||||
|
func (s *SQLiteStorage) ExecInTransaction(ctx context.Context, fn func(*sql.Tx) error) error {
|
||||||
|
tx, err := s.db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
if err := fn(tx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsUniqueConstraintError checks if an error is a UNIQUE constraint violation
|
||||||
|
func IsUniqueConstraintError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return strings.Contains(err.Error(), "UNIQUE constraint failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// isUniqueConstraintError is an alias for IsUniqueConstraintError for internal use
|
||||||
|
func isUniqueConstraintError(err error) bool {
|
||||||
|
return IsUniqueConstraintError(err)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user