fix(storage): add batch ID conflict detection and fix schema indexes

- Add checkForExistingIDs function to detect duplicate IDs within batch
  and conflicts with existing database entries before insert
- Remove thread_id index creation from schema.go since thread_id column
  is added by migration 020_edge_consolidation.go

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-18 19:11:50 -08:00
parent 2ea1b74f43
commit 9dc34da64a
2 changed files with 60 additions and 2 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/steveyegge/beads/internal/types"
@@ -101,6 +102,57 @@ func bulkMarkDirty(ctx context.Context, conn *sql.Conn, issues []*types.Issue) e
return markDirtyBatch(ctx, conn, issues)
}
// checkForExistingIDs verifies that:
// 1. There are no duplicate IDs within the batch itself
// 2. None of the issue IDs already exist in the database
// Returns an error if any conflicts are found, ensuring CreateIssues fails atomically
// rather than silently skipping duplicates via INSERT OR IGNORE.
func checkForExistingIDs(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
if len(issues) == 0 {
return nil
}
// Build list of IDs to check and detect duplicates within batch
seenIDs := make(map[string]bool)
ids := make([]string, 0, len(issues))
for _, issue := range issues {
if issue.ID != "" {
// Check for duplicates within the batch
if seenIDs[issue.ID] {
return fmt.Errorf("duplicate issue ID within batch: %s", issue.ID)
}
seenIDs[issue.ID] = true
ids = append(ids, issue.ID)
}
}
if len(ids) == 0 {
return nil
}
// Check for existing IDs in database using a single query with IN clause
placeholders := make([]string, len(ids))
args := make([]interface{}, len(ids))
for i, id := range ids {
placeholders[i] = "?"
args[i] = id
}
query := fmt.Sprintf("SELECT id FROM issues WHERE id IN (%s) LIMIT 1", strings.Join(placeholders, ","))
var existingID string
err := conn.QueryRowContext(ctx, query, args...).Scan(&existingID)
if err == nil {
// Found an existing ID
return fmt.Errorf("issue ID %s already exists", existingID)
}
if err != sql.ErrNoRows {
// Unexpected error
return fmt.Errorf("failed to check for existing IDs: %w", err)
}
return nil
}
// CreateIssues creates multiple issues atomically in a single transaction.
// This provides significant performance improvements over calling CreateIssue in a loop:
// - Single connection acquisition
@@ -209,6 +261,11 @@ func (s *SQLiteStorage) CreateIssuesWithFullOptions(ctx context.Context, issues
return wrapDBError("generate batch IDs", err)
}
// Phase 3.5: Check for conflicts with existing IDs in database
if err := checkForExistingIDs(ctx, conn, issues); err != nil {
return err
}
// Phase 4: Bulk insert issues
if err := bulkInsertIssues(ctx, conn, issues); err != nil {
return wrapDBError("bulk insert issues", err)

View File

@@ -59,8 +59,9 @@ CREATE INDEX IF NOT EXISTS idx_dependencies_issue ON dependencies(issue_id);
CREATE INDEX IF NOT EXISTS idx_dependencies_depends_on ON dependencies(depends_on_id);
CREATE INDEX IF NOT EXISTS idx_dependencies_depends_on_type ON dependencies(depends_on_id, type);
CREATE INDEX IF NOT EXISTS idx_dependencies_depends_on_type_issue ON dependencies(depends_on_id, type, issue_id);
CREATE INDEX IF NOT EXISTS idx_dependencies_thread ON dependencies(thread_id) WHERE thread_id != '';
CREATE INDEX IF NOT EXISTS idx_dependencies_thread_type ON dependencies(thread_id, type) WHERE thread_id != '';
-- NOTE: idx_dependencies_thread and idx_dependencies_thread_type are created by
-- migration 020_edge_consolidation.go after adding the thread_id column.
-- They cannot be in the schema because existing databases may not have thread_id yet.
-- Labels table
CREATE TABLE IF NOT EXISTS labels (