Files
beads/internal/storage/sqlite/batch_ops.go
Steve Yegge 9dc34da64a fix(storage): add batch ID conflict detection and fix schema indexes
- Add checkForExistingIDs function to detect duplicate IDs within batch
  and conflicts with existing database entries before insert
- Remove thread_id index creation from schema.go since thread_id column
  is added by migration 020_edge_consolidation.go

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-18 19:11:50 -08:00

291 lines
10 KiB
Go

package sqlite
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/steveyegge/beads/internal/types"
)
// validateBatchIssues validates all issues in a batch and sets timestamps if not provided
// Uses built-in statuses only for backward compatibility.
func validateBatchIssues(issues []*types.Issue) error {
return validateBatchIssuesWithCustomStatuses(issues, nil)
}
// validateBatchIssuesWithCustomStatuses validates all issues in a batch,
// allowing custom statuses in addition to built-in ones (bd-1pj6).
func validateBatchIssuesWithCustomStatuses(issues []*types.Issue, customStatuses []string) error {
now := time.Now()
for i, issue := range issues {
if issue == nil {
return fmt.Errorf("issue %d is nil", i)
}
// Only set timestamps if not already provided
if issue.CreatedAt.IsZero() {
issue.CreatedAt = now
}
if issue.UpdatedAt.IsZero() {
issue.UpdatedAt = now
}
// Defensive fix for closed_at invariant (GH#523): older versions of bd could
// close issues without setting closed_at. Fix by using max(created_at, updated_at) + 1s.
if issue.Status == types.StatusClosed && issue.ClosedAt == nil {
maxTime := issue.CreatedAt
if issue.UpdatedAt.After(maxTime) {
maxTime = issue.UpdatedAt
}
closedAt := maxTime.Add(time.Second)
issue.ClosedAt = &closedAt
}
// Defensive fix for deleted_at invariant: tombstones must have deleted_at
if issue.Status == types.StatusTombstone && issue.DeletedAt == nil {
maxTime := issue.CreatedAt
if issue.UpdatedAt.After(maxTime) {
maxTime = issue.UpdatedAt
}
deletedAt := maxTime.Add(time.Second)
issue.DeletedAt = &deletedAt
}
if err := issue.ValidateWithCustomStatuses(customStatuses); err != nil {
return fmt.Errorf("validation failed for issue %d: %w", i, err)
}
}
return nil
}
// generateBatchIDs generates IDs for all issues that need them atomically
func (s *SQLiteStorage) generateBatchIDs(ctx context.Context, conn *sql.Conn, issues []*types.Issue, actor string, orphanHandling OrphanHandling, skipPrefixValidation bool) error {
// Get prefix from config (needed for both generation and validation)
var prefix string
err := conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix)
if err == sql.ErrNoRows || prefix == "" {
// CRITICAL: Reject operation if issue_prefix config is missing (bd-166)
return fmt.Errorf("database not initialized: issue_prefix config is missing (run 'bd init --prefix <prefix>' first)")
} else if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}
// Generate or validate IDs for all issues
if err := EnsureIDs(ctx, conn, prefix, issues, actor, orphanHandling, skipPrefixValidation); err != nil {
return wrapDBError("ensure IDs", err)
}
// Compute content hashes
for i := range issues {
if issues[i].ContentHash == "" {
issues[i].ContentHash = issues[i].ComputeContentHash()
}
}
return nil
}
// bulkInsertIssues delegates to insertIssues helper
func bulkInsertIssues(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
return insertIssues(ctx, conn, issues)
}
// bulkRecordEvents delegates to recordCreatedEvents helper
func bulkRecordEvents(ctx context.Context, conn *sql.Conn, issues []*types.Issue, actor string) error {
return recordCreatedEvents(ctx, conn, issues, actor)
}
// bulkMarkDirty delegates to markDirtyBatch helper
func bulkMarkDirty(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
return markDirtyBatch(ctx, conn, issues)
}
// checkForExistingIDs verifies that:
// 1. There are no duplicate IDs within the batch itself
// 2. None of the issue IDs already exist in the database
// Returns an error if any conflicts are found, ensuring CreateIssues fails atomically
// rather than silently skipping duplicates via INSERT OR IGNORE.
func checkForExistingIDs(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
if len(issues) == 0 {
return nil
}
// Build list of IDs to check and detect duplicates within batch
seenIDs := make(map[string]bool)
ids := make([]string, 0, len(issues))
for _, issue := range issues {
if issue.ID != "" {
// Check for duplicates within the batch
if seenIDs[issue.ID] {
return fmt.Errorf("duplicate issue ID within batch: %s", issue.ID)
}
seenIDs[issue.ID] = true
ids = append(ids, issue.ID)
}
}
if len(ids) == 0 {
return nil
}
// Check for existing IDs in database using a single query with IN clause
placeholders := make([]string, len(ids))
args := make([]interface{}, len(ids))
for i, id := range ids {
placeholders[i] = "?"
args[i] = id
}
query := fmt.Sprintf("SELECT id FROM issues WHERE id IN (%s) LIMIT 1", strings.Join(placeholders, ","))
var existingID string
err := conn.QueryRowContext(ctx, query, args...).Scan(&existingID)
if err == nil {
// Found an existing ID
return fmt.Errorf("issue ID %s already exists", existingID)
}
if err != sql.ErrNoRows {
// Unexpected error
return fmt.Errorf("failed to check for existing IDs: %w", err)
}
return nil
}
// CreateIssues creates multiple issues atomically in a single transaction.
// This provides significant performance improvements over calling CreateIssue in a loop:
// - Single connection acquisition
// - Single transaction
// - Atomic ID range reservation (one counter update for N issues)
// - All-or-nothing atomicity
//
// Expected 5-10x speedup for batches of 10+ issues.
// CreateIssues creates multiple issues atomically in a single transaction.
//
// This method is optimized for bulk issue creation and provides significant
// performance improvements over calling CreateIssue in a loop:
// - Single database connection and transaction
// - Atomic ID range reservation (one counter update for N IDs)
// - All-or-nothing semantics (rolls back on any error)
// - 5-15x faster than sequential CreateIssue calls
//
// All issues are validated before any database changes occur. If any issue
// fails validation, the entire batch is rejected.
//
// ID Assignment:
// - Issues with empty ID get auto-generated IDs from a reserved range
// - Issues with explicit IDs use those IDs (caller must ensure uniqueness)
// - Mix of explicit and auto-generated IDs is supported
//
// Timestamps:
// - All issues in the batch receive identical created_at/updated_at timestamps
// - This reflects that they were created as a single atomic operation
//
// Usage:
// // Bulk import from external source
// issues := []*types.Issue{...}
// if err := store.CreateIssues(ctx, issues, "import"); err != nil {
// return err
// }
//
// // After importing with explicit IDs, sync counters to prevent collisions
// REMOVED (bd-c7af): SyncAllCounters example - no longer needed with hash IDs
//
// Performance:
// - 100 issues: ~30ms (vs ~900ms with CreateIssue loop)
// - 1000 issues: ~950ms (vs estimated 9s with CreateIssue loop)
//
// When to use:
// - Bulk imports from external systems (use CreateIssues)
// - Creating multiple related issues at once (use CreateIssues)
// - Single issue creation (use CreateIssue for simplicity)
// - Interactive user operations (use CreateIssue)
func (s *SQLiteStorage) CreateIssues(ctx context.Context, issues []*types.Issue, actor string) error {
// Default to OrphanResurrect for backward compatibility
return s.CreateIssuesWithOptions(ctx, issues, actor, OrphanResurrect)
}
// BatchCreateOptions contains options for batch issue creation
type BatchCreateOptions struct {
OrphanHandling OrphanHandling // How to handle missing parent issues
SkipPrefixValidation bool // Skip prefix validation for existing IDs (used during import)
}
// CreateIssuesWithOptions creates multiple issues with configurable orphan handling
func (s *SQLiteStorage) CreateIssuesWithOptions(ctx context.Context, issues []*types.Issue, actor string, orphanHandling OrphanHandling) error {
return s.CreateIssuesWithFullOptions(ctx, issues, actor, BatchCreateOptions{
OrphanHandling: orphanHandling,
SkipPrefixValidation: false,
})
}
// CreateIssuesWithFullOptions creates multiple issues with full options control
func (s *SQLiteStorage) CreateIssuesWithFullOptions(ctx context.Context, issues []*types.Issue, actor string, opts BatchCreateOptions) error {
if len(issues) == 0 {
return nil
}
// Fetch custom statuses for validation (bd-1pj6)
customStatuses, err := s.GetCustomStatuses(ctx)
if err != nil {
return fmt.Errorf("failed to get custom statuses: %w", err)
}
// Phase 1: Validate all issues first (fail-fast, with custom status support)
if err := validateBatchIssuesWithCustomStatuses(issues, customStatuses); err != nil {
return err
}
// Phase 2: Acquire connection and start transaction
conn, err := s.db.Conn(ctx)
if err != nil {
return fmt.Errorf("failed to acquire connection: %w", err)
}
defer func() { _ = conn.Close() }()
// Use retry logic with exponential backoff to handle SQLITE_BUSY under concurrent load (bd-ola6)
if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil {
return fmt.Errorf("failed to begin immediate transaction: %w", err)
}
committed := false
defer func() {
if !committed {
_, _ = conn.ExecContext(context.Background(), "ROLLBACK")
}
}()
// Phase 3: Generate IDs for issues that need them
if err := s.generateBatchIDs(ctx, conn, issues, actor, opts.OrphanHandling, opts.SkipPrefixValidation); err != nil {
return wrapDBError("generate batch IDs", err)
}
// Phase 3.5: Check for conflicts with existing IDs in database
if err := checkForExistingIDs(ctx, conn, issues); err != nil {
return err
}
// Phase 4: Bulk insert issues
if err := bulkInsertIssues(ctx, conn, issues); err != nil {
return wrapDBError("bulk insert issues", err)
}
// Phase 5: Record creation events
if err := bulkRecordEvents(ctx, conn, issues, actor); err != nil {
return wrapDBError("record creation events", err)
}
// Phase 6: Mark issues dirty for incremental export
if err := bulkMarkDirty(ctx, conn, issues); err != nil {
return wrapDBError("mark issues dirty", err)
}
// Phase 7: Commit transaction
if _, err := conn.ExecContext(ctx, "COMMIT"); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
committed = true
return nil
}