Add external_ref UNIQUE constraint and validation

- Add migration for UNIQUE index on external_ref column (bd-897a)
- Add validation for duplicate external_ref in batch imports (bd-7315)
- Add query planner test to verify index usage (bd-f9a1)
- Add concurrent import tests for external_ref (bd-3f6a)

The migration detects existing duplicates and fails gracefully.
Batch imports now reject duplicates with clear error messages.
Tests verify the index is actually used by SQLite query planner.

Amp-Thread-ID: https://ampcode.com/threads/T-45ca66ed-3912-46c4-963c-caa7724a9a2f
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Steve Yegge
2025-11-02 16:27:34 -08:00
parent 10029d0baf
commit 178a43dd5d
5 changed files with 426 additions and 0 deletions

View File

@@ -88,6 +88,11 @@ func ImportIssues(ctx context.Context, dbPath string, store storage.Storage, iss
return result, err return result, err
} }
// Validate no duplicate external_ref values in batch
if err := validateNoDuplicateExternalRefs(issues); err != nil {
return result, err
}
// Detect and resolve collisions // Detect and resolve collisions
issues, err = detectUpdates(ctx, sqliteStore, issues, opts, result) issues, err = detectUpdates(ctx, sqliteStore, issues, opts, result)
if err != nil { if err != nil {
@@ -672,3 +677,28 @@ func GetPrefixList(prefixes map[string]int) []string {
} }
return result return result
} }
func validateNoDuplicateExternalRefs(issues []*types.Issue) error {
seen := make(map[string][]string)
for _, issue := range issues {
if issue.ExternalRef != nil && *issue.ExternalRef != "" {
ref := *issue.ExternalRef
seen[ref] = append(seen[ref], issue.ID)
}
}
var duplicates []string
for ref, issueIDs := range seen {
if len(issueIDs) > 1 {
duplicates = append(duplicates, fmt.Sprintf("external_ref '%s' appears in issues: %v", ref, issueIDs))
}
}
if len(duplicates) > 0 {
sort.Strings(duplicates)
return fmt.Errorf("batch import contains duplicate external_ref values:\n%s", strings.Join(duplicates, "\n"))
}
return nil
}

View File

@@ -2,6 +2,8 @@ package importer
import ( import (
"context" "context"
"strings"
"sync"
"testing" "testing"
"time" "time"
@@ -883,3 +885,209 @@ func TestGetPrefixList(t *testing.T) {
}) })
} }
} }
func TestValidateNoDuplicateExternalRefs(t *testing.T) {
t.Run("no external_ref values", func(t *testing.T) {
issues := []*types.Issue{
{ID: "bd-1", Title: "Issue 1"},
{ID: "bd-2", Title: "Issue 2"},
}
err := validateNoDuplicateExternalRefs(issues)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
})
t.Run("unique external_ref values", func(t *testing.T) {
ref1 := "JIRA-1"
ref2 := "JIRA-2"
issues := []*types.Issue{
{ID: "bd-1", Title: "Issue 1", ExternalRef: &ref1},
{ID: "bd-2", Title: "Issue 2", ExternalRef: &ref2},
}
err := validateNoDuplicateExternalRefs(issues)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
})
t.Run("duplicate external_ref values", func(t *testing.T) {
ref1 := "JIRA-1"
ref2 := "JIRA-1"
issues := []*types.Issue{
{ID: "bd-1", Title: "Issue 1", ExternalRef: &ref1},
{ID: "bd-2", Title: "Issue 2", ExternalRef: &ref2},
}
err := validateNoDuplicateExternalRefs(issues)
if err == nil {
t.Error("Expected error for duplicate external_ref, got nil")
}
if err != nil && !strings.Contains(err.Error(), "duplicate external_ref values") {
t.Errorf("Expected error about duplicates, got: %v", err)
}
})
t.Run("multiple duplicates", func(t *testing.T) {
jira1 := "JIRA-1"
jira2 := "JIRA-2"
issues := []*types.Issue{
{ID: "bd-1", Title: "Issue 1", ExternalRef: &jira1},
{ID: "bd-2", Title: "Issue 2", ExternalRef: &jira1},
{ID: "bd-3", Title: "Issue 3", ExternalRef: &jira2},
{ID: "bd-4", Title: "Issue 4", ExternalRef: &jira2},
}
err := validateNoDuplicateExternalRefs(issues)
if err == nil {
t.Error("Expected error for duplicate external_ref, got nil")
}
if err != nil {
if !strings.Contains(err.Error(), "JIRA-1") || !strings.Contains(err.Error(), "JIRA-2") {
t.Errorf("Expected error to mention both JIRA-1 and JIRA-2, got: %v", err)
}
}
})
t.Run("ignores empty external_ref", func(t *testing.T) {
empty := ""
ref1 := "JIRA-1"
issues := []*types.Issue{
{ID: "bd-1", Title: "Issue 1", ExternalRef: &empty},
{ID: "bd-2", Title: "Issue 2", ExternalRef: &empty},
{ID: "bd-3", Title: "Issue 3", ExternalRef: &ref1},
}
err := validateNoDuplicateExternalRefs(issues)
if err != nil {
t.Errorf("Expected no error for empty refs, got: %v", err)
}
})
}
func TestConcurrentExternalRefImports(t *testing.T) {
t.Run("sequential imports with same external_ref are detected as updates", func(t *testing.T) {
store, err := sqlite.New(":memory:")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close()
ctx := context.Background()
if err := store.SetConfig(ctx, "issue_prefix", "bd"); err != nil {
t.Fatalf("Failed to set prefix: %v", err)
}
externalRef := "JIRA-100"
issue1 := &types.Issue{
ID: "bd-1",
Title: "First import",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
ExternalRef: &externalRef,
}
result1, err := ImportIssues(ctx, "", store, []*types.Issue{issue1}, Options{})
if err != nil {
t.Fatalf("First import failed: %v", err)
}
if result1.Created != 1 {
t.Errorf("Expected 1 created, got %d", result1.Created)
}
issue2 := &types.Issue{
ID: "bd-2",
Title: "Second import (different ID, same external_ref)",
Status: types.StatusInProgress,
Priority: 2,
IssueType: types.TypeTask,
ExternalRef: &externalRef,
UpdatedAt: time.Now().Add(1 * time.Hour),
}
result2, err := ImportIssues(ctx, "", store, []*types.Issue{issue2}, Options{})
if err != nil {
t.Fatalf("Second import failed: %v", err)
}
if result2.Updated != 1 {
t.Errorf("Expected 1 updated, got %d (created: %d)", result2.Updated, result2.Created)
}
finalIssue, err := store.GetIssueByExternalRef(ctx, externalRef)
if err != nil {
t.Fatalf("Failed to get final issue: %v", err)
}
if finalIssue.ID != "bd-1" {
t.Errorf("Expected final issue ID to be bd-1, got %s", finalIssue.ID)
}
if finalIssue.Title != "Second import (different ID, same external_ref)" {
t.Errorf("Expected title to be updated, got %s", finalIssue.Title)
}
})
t.Run("concurrent updates to same external_ref with different timestamps", func(t *testing.T) {
store, err := sqlite.New(":memory:")
if err != nil {
t.Fatalf("Failed to create store: %v", err)
}
defer store.Close()
ctx := context.Background()
if err := store.SetConfig(ctx, "issue_prefix", "bd"); err != nil {
t.Fatalf("Failed to set prefix: %v", err)
}
externalRef := "JIRA-200"
existing := &types.Issue{
ID: "bd-1",
Title: "Existing issue",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
ExternalRef: &externalRef,
}
if err := store.CreateIssue(ctx, existing, "test"); err != nil {
t.Fatalf("Failed to create existing issue: %v", err)
}
var wg sync.WaitGroup
results := make([]*Result, 3)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
updated := &types.Issue{
ID: "bd-import-" + string(rune('1'+idx)),
Title: "Updated from worker " + string(rune('A'+idx)),
Status: types.StatusInProgress,
Priority: 2,
IssueType: types.TypeTask,
ExternalRef: &externalRef,
UpdatedAt: time.Now().Add(time.Duration(idx) * time.Second),
}
result, _ := ImportIssues(ctx, "", store, []*types.Issue{updated}, Options{})
results[idx] = result
}(i)
}
wg.Wait()
finalIssue, err := store.GetIssueByExternalRef(ctx, externalRef)
if err != nil {
t.Fatalf("Failed to get final issue: %v", err)
}
if finalIssue == nil {
t.Error("Expected final issue to exist")
}
t.Logf("Final issue title: %s, status: %s", finalIssue.Title, finalIssue.Status)
})
}

View File

@@ -279,3 +279,68 @@ func TestExternalRefIndex(t *testing.T) {
t.Error("Expected idx_issues_external_ref index to exist") t.Error("Expected idx_issues_external_ref index to exist")
} }
} }
func TestExternalRefIndexUsage(t *testing.T) {
ctx := context.Background()
s, cleanup := setupTestDB(t)
defer cleanup()
externalRef := "JIRA-123"
issue := &types.Issue{
ID: "bd-test-1",
Title: "Test issue",
Status: types.StatusOpen,
Priority: 1,
IssueType: types.TypeTask,
ExternalRef: &externalRef,
}
err := s.CreateIssue(ctx, issue, "test")
if err != nil {
t.Fatalf("Failed to create issue: %v", err)
}
rows, err := s.db.QueryContext(ctx, `
EXPLAIN QUERY PLAN
SELECT id, title, description, design, acceptance_criteria, notes, status, priority, issue_type, assignee,
created_at, updated_at, closed_at, external_ref,
compaction_level, compacted_at, compacted_at_commit, original_size
FROM issues
WHERE external_ref = ?
`, externalRef)
if err != nil {
t.Fatalf("Failed to get query plan: %v", err)
}
defer rows.Close()
var planFound bool
var indexUsed bool
for rows.Next() {
var id, parent, notused int
var detail string
if err := rows.Scan(&id, &parent, &notused, &detail); err != nil {
t.Fatalf("Failed to scan query plan row: %v", err)
}
planFound = true
if detail == "SEARCH TABLE issues USING INDEX idx_issues_external_ref (external_ref=?)" ||
detail == "SEARCH issues USING INDEX idx_issues_external_ref (external_ref=?)" ||
detail == "SEARCH TABLE issues USING INDEX idx_issues_external_ref_unique (external_ref=?)" ||
detail == "SEARCH issues USING INDEX idx_issues_external_ref_unique (external_ref=?)" {
indexUsed = true
}
}
if err := rows.Err(); err != nil {
t.Fatalf("Error reading query plan: %v", err)
}
if !planFound {
t.Error("Expected query plan output, got none")
}
if !indexUsed {
t.Error("Expected query planner to use idx_issues_external_ref index, but it didn't")
}
}

View File

@@ -4,6 +4,7 @@ package sqlite
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"strings"
"github.com/steveyegge/beads/internal/types" "github.com/steveyegge/beads/internal/types"
) )
@@ -27,6 +28,7 @@ var migrations = []Migration{
{"compacted_at_commit_column", migrateCompactedAtCommitColumn}, {"compacted_at_commit_column", migrateCompactedAtCommitColumn},
{"export_hashes_table", migrateExportHashesTable}, {"export_hashes_table", migrateExportHashesTable},
{"content_hash_column", migrateContentHashColumn}, {"content_hash_column", migrateContentHashColumn},
{"external_ref_unique", migrateExternalRefUnique},
} }
// MigrationInfo contains metadata about a migration for inspection // MigrationInfo contains metadata about a migration for inspection
@@ -61,6 +63,7 @@ func getMigrationDescription(name string) string {
"compacted_at_commit_column": "Adds compacted_at_commit to snapshots table", "compacted_at_commit_column": "Adds compacted_at_commit to snapshots table",
"export_hashes_table": "Adds export_hashes table for idempotent exports", "export_hashes_table": "Adds export_hashes table for idempotent exports",
"content_hash_column": "Adds content_hash column for collision resolution", "content_hash_column": "Adds content_hash column for collision resolution",
"external_ref_unique": "Adds UNIQUE constraint on external_ref column",
} }
if desc, ok := descriptions[name]; ok { if desc, ok := descriptions[name]; ok {
@@ -510,3 +513,62 @@ func migrateContentHashColumn(db *sql.DB) error {
// Column already exists // Column already exists
return nil return nil
} }
func migrateExternalRefUnique(db *sql.DB) error {
var hasConstraint bool
err := db.QueryRow(`
SELECT COUNT(*) > 0
FROM sqlite_master
WHERE type = 'index'
AND name = 'idx_issues_external_ref_unique'
`).Scan(&hasConstraint)
if err != nil {
return fmt.Errorf("failed to check for UNIQUE constraint: %w", err)
}
if hasConstraint {
return nil
}
existingDuplicates, err := findExternalRefDuplicates(db)
if err != nil {
return fmt.Errorf("failed to check for duplicate external_ref values: %w", err)
}
if len(existingDuplicates) > 0 {
return fmt.Errorf("cannot add UNIQUE constraint: found %d duplicate external_ref values (resolve with 'bd duplicates' or manually)", len(existingDuplicates))
}
_, err = db.Exec(`CREATE UNIQUE INDEX IF NOT EXISTS idx_issues_external_ref_unique ON issues(external_ref) WHERE external_ref IS NOT NULL`)
if err != nil {
return fmt.Errorf("failed to create UNIQUE index on external_ref: %w", err)
}
return nil
}
func findExternalRefDuplicates(db *sql.DB) (map[string][]string, error) {
rows, err := db.Query(`
SELECT external_ref, GROUP_CONCAT(id, ',') as ids
FROM issues
WHERE external_ref IS NOT NULL
GROUP BY external_ref
HAVING COUNT(*) > 1
`)
if err != nil {
return nil, err
}
defer rows.Close()
duplicates := make(map[string][]string)
for rows.Next() {
var externalRef, idsCSV string
if err := rows.Scan(&externalRef, &idsCSV); err != nil {
return nil, err
}
ids := strings.Split(idsCSV, ",")
duplicates[externalRef] = ids
}
return duplicates, rows.Err()
}

View File

@@ -3,6 +3,7 @@ package sqlite
import ( import (
"context" "context"
"database/sql" "database/sql"
"strings"
"testing" "testing"
"github.com/steveyegge/beads/internal/types" "github.com/steveyegge/beads/internal/types"
@@ -289,6 +290,66 @@ func TestMigrateExportHashesTable(t *testing.T) {
} }
} }
func TestMigrateExternalRefUnique(t *testing.T) {
t.Run("creates unique index on external_ref", func(t *testing.T) {
store, cleanup := setupTestDB(t)
defer cleanup()
db := store.db
externalRef1 := "JIRA-1"
externalRef2 := "JIRA-2"
issue1 := types.Issue{ID: "bd-1", Title: "Issue 1", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, ExternalRef: &externalRef1}
if err := store.CreateIssue(context.Background(), &issue1, "test"); err != nil {
t.Fatalf("failed to create issue1: %v", err)
}
issue2 := types.Issue{ID: "bd-2", Title: "Issue 2", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, ExternalRef: &externalRef2}
if err := store.CreateIssue(context.Background(), &issue2, "test"); err != nil {
t.Fatalf("failed to create issue2: %v", err)
}
if err := migrateExternalRefUnique(db); err != nil {
t.Fatalf("failed to migrate external_ref unique constraint: %v", err)
}
issue3 := types.Issue{ID: "bd-3", Title: "Issue 3", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, ExternalRef: &externalRef1}
err := store.CreateIssue(context.Background(), &issue3, "test")
if err == nil {
t.Error("Expected error when creating issue with duplicate external_ref, got nil")
}
})
t.Run("fails if duplicates exist", func(t *testing.T) {
store, cleanup := setupTestDB(t)
defer cleanup()
db := store.db
_, err := db.Exec(`DROP INDEX IF EXISTS idx_issues_external_ref_unique`)
if err != nil {
t.Fatalf("failed to drop index: %v", err)
}
externalRef := "JIRA-1"
issue1 := types.Issue{ID: "bd-100", Title: "Issue 1", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask, ExternalRef: &externalRef}
if err := store.CreateIssue(context.Background(), &issue1, "test"); err != nil {
t.Fatalf("failed to create issue1: %v", err)
}
_, err = db.Exec(`INSERT INTO issues (id, title, status, priority, issue_type, external_ref, created_at, updated_at) VALUES (?, ?, 'open', 1, 'task', ?, datetime('now'), datetime('now'))`, "bd-101", "Issue 2", externalRef)
if err != nil {
t.Fatalf("failed to create duplicate: %v", err)
}
err = migrateExternalRefUnique(db)
if err == nil {
t.Error("Expected migration to fail with duplicates present")
}
if !strings.Contains(err.Error(), "duplicate external_ref values") {
t.Errorf("Expected error about duplicates, got: %v", err)
}
})
}
func TestMigrateContentHashColumn(t *testing.T) { func TestMigrateContentHashColumn(t *testing.T) {
t.Run("adds content_hash column if missing", func(t *testing.T) { t.Run("adds content_hash column if missing", func(t *testing.T) {
s, cleanup := setupTestDB(t) s, cleanup := setupTestDB(t)