Add insertIssuesStrict function that uses plain INSERT instead of INSERT OR IGNORE. Update bulkInsertIssues and transactional CreateIssues to use the strict variant. This fixes a race condition where INSERT OR IGNORE could silently skip duplicate insertions, but the code would still attempt to record events for those "inserted" issues, causing FOREIGN KEY constraint failures. The strict INSERT will now fail explicitly if a duplicate is encountered, which should never happen since checkForExistingIDs runs first within the same transaction. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
326 lines
12 KiB
Go
326 lines
12 KiB
Go
package sqlite
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/steveyegge/beads/internal/types"
|
|
)
|
|
|
|
// validateBatchIssues validates all issues in a batch and sets timestamps if not provided
|
|
// Uses built-in statuses and types only for backward compatibility.
|
|
func validateBatchIssues(issues []*types.Issue) error {
|
|
return validateBatchIssuesWithCustom(issues, nil, nil)
|
|
}
|
|
|
|
// validateBatchIssuesWithCustom validates all issues in a batch,
|
|
// allowing custom statuses and types in addition to built-in ones.
|
|
func validateBatchIssuesWithCustom(issues []*types.Issue, customStatuses, customTypes []string) error {
|
|
now := time.Now()
|
|
for i, issue := range issues {
|
|
if issue == nil {
|
|
return fmt.Errorf("issue %d is nil", i)
|
|
}
|
|
|
|
// Only set timestamps if not already provided
|
|
if issue.CreatedAt.IsZero() {
|
|
issue.CreatedAt = now
|
|
}
|
|
if issue.UpdatedAt.IsZero() {
|
|
issue.UpdatedAt = now
|
|
}
|
|
|
|
// Defensive fix for closed_at invariant (GH#523): older versions of bd could
|
|
// close issues without setting closed_at. Fix by using max(created_at, updated_at) + 1s.
|
|
if issue.Status == types.StatusClosed && issue.ClosedAt == nil {
|
|
maxTime := issue.CreatedAt
|
|
if issue.UpdatedAt.After(maxTime) {
|
|
maxTime = issue.UpdatedAt
|
|
}
|
|
closedAt := maxTime.Add(time.Second)
|
|
issue.ClosedAt = &closedAt
|
|
}
|
|
|
|
// Defensive fix for deleted_at invariant: tombstones must have deleted_at
|
|
if issue.Status == types.StatusTombstone && issue.DeletedAt == nil {
|
|
maxTime := issue.CreatedAt
|
|
if issue.UpdatedAt.After(maxTime) {
|
|
maxTime = issue.UpdatedAt
|
|
}
|
|
deletedAt := maxTime.Add(time.Second)
|
|
issue.DeletedAt = &deletedAt
|
|
}
|
|
|
|
if err := issue.ValidateWithCustom(customStatuses, customTypes); err != nil {
|
|
return fmt.Errorf("validation failed for issue %d: %w", i, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// generateBatchIDs generates IDs for all issues that need them atomically
|
|
func (s *SQLiteStorage) generateBatchIDs(ctx context.Context, conn *sql.Conn, issues []*types.Issue, actor string, orphanHandling OrphanHandling, skipPrefixValidation bool) error {
|
|
// Get prefix from config (needed for both generation and validation)
|
|
var prefix string
|
|
err := conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix)
|
|
if err == sql.ErrNoRows || prefix == "" {
|
|
// CRITICAL: Reject operation if issue_prefix config is missing
|
|
return fmt.Errorf("database not initialized: issue_prefix config is missing (run 'bd init --prefix <prefix>' first)")
|
|
} else if err != nil {
|
|
return fmt.Errorf("failed to get config: %w", err)
|
|
}
|
|
|
|
// Generate or validate IDs for all issues
|
|
if err := EnsureIDs(ctx, conn, prefix, issues, actor, orphanHandling, skipPrefixValidation); err != nil {
|
|
return wrapDBError("ensure IDs", err)
|
|
}
|
|
|
|
// Compute content hashes
|
|
for i := range issues {
|
|
if issues[i].ContentHash == "" {
|
|
issues[i].ContentHash = issues[i].ComputeContentHash()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// bulkInsertIssues delegates to insertIssuesStrict helper for fresh issue creation.
|
|
// GH#956: Using strict insert prevents FK constraint errors from silent INSERT OR IGNORE failures.
|
|
func bulkInsertIssues(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
|
|
return insertIssuesStrict(ctx, conn, issues)
|
|
}
|
|
|
|
// bulkRecordEvents delegates to recordCreatedEvents helper
|
|
func bulkRecordEvents(ctx context.Context, conn *sql.Conn, issues []*types.Issue, actor string) error {
|
|
return recordCreatedEvents(ctx, conn, issues, actor)
|
|
}
|
|
|
|
// bulkMarkDirty delegates to markDirtyBatch helper
|
|
func bulkMarkDirty(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
|
|
return markDirtyBatch(ctx, conn, issues)
|
|
}
|
|
|
|
// updateChildCountersForHierarchicalIDs updates child_counters for all hierarchical IDs in the batch.
|
|
// This is called AFTER issues are inserted so that parents exist for the foreign key constraint.
|
|
// (GH#728 fix)
|
|
func updateChildCountersForHierarchicalIDs(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
|
|
for _, issue := range issues {
|
|
if issue.ID == "" {
|
|
continue // Skip issues that were filtered out (e.g., OrphanSkip)
|
|
}
|
|
if parentID, childNum, ok := ParseHierarchicalID(issue.ID); ok {
|
|
// Only update if parent exists (it should after insert, but check to be safe)
|
|
var parentCount int
|
|
if err := conn.QueryRowContext(ctx, `SELECT COUNT(*) FROM issues WHERE id = ?`, parentID).Scan(&parentCount); err != nil {
|
|
return fmt.Errorf("failed to check parent existence for %s: %w", parentID, err)
|
|
}
|
|
if parentCount > 0 {
|
|
if err := ensureChildCounterUpdatedWithConn(ctx, conn, parentID, childNum); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkForExistingIDs verifies that:
|
|
// 1. There are no duplicate IDs within the batch itself
|
|
// 2. None of the issue IDs already exist in the database
|
|
// Returns an error if any conflicts are found, ensuring CreateIssues fails atomically
|
|
// rather than silently skipping duplicates via INSERT OR IGNORE.
|
|
func checkForExistingIDs(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error {
|
|
if len(issues) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Build list of IDs to check and detect duplicates within batch
|
|
seenIDs := make(map[string]bool)
|
|
ids := make([]string, 0, len(issues))
|
|
for _, issue := range issues {
|
|
if issue.ID != "" {
|
|
// Check for duplicates within the batch
|
|
if seenIDs[issue.ID] {
|
|
return fmt.Errorf("duplicate issue ID within batch: %s", issue.ID)
|
|
}
|
|
seenIDs[issue.ID] = true
|
|
ids = append(ids, issue.ID)
|
|
}
|
|
}
|
|
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Check for existing IDs in database using a single query with IN clause
|
|
placeholders := make([]string, len(ids))
|
|
args := make([]interface{}, len(ids))
|
|
for i, id := range ids {
|
|
placeholders[i] = "?"
|
|
args[i] = id
|
|
}
|
|
|
|
query := fmt.Sprintf("SELECT id FROM issues WHERE id IN (%s) LIMIT 1", strings.Join(placeholders, ","))
|
|
var existingID string
|
|
err := conn.QueryRowContext(ctx, query, args...).Scan(&existingID)
|
|
if err == nil {
|
|
// Found an existing ID
|
|
return fmt.Errorf("issue ID %s already exists", existingID)
|
|
}
|
|
if err != sql.ErrNoRows {
|
|
// Unexpected error
|
|
return fmt.Errorf("failed to check for existing IDs: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateIssues creates multiple issues atomically in a single transaction.
|
|
// This provides significant performance improvements over calling CreateIssue in a loop:
|
|
// - Single connection acquisition
|
|
// - Single transaction
|
|
// - Atomic ID range reservation (one counter update for N issues)
|
|
// - All-or-nothing atomicity
|
|
//
|
|
// Expected 5-10x speedup for batches of 10+ issues.
|
|
// CreateIssues creates multiple issues atomically in a single transaction.
|
|
//
|
|
// This method is optimized for bulk issue creation and provides significant
|
|
// performance improvements over calling CreateIssue in a loop:
|
|
// - Single database connection and transaction
|
|
// - Atomic ID range reservation (one counter update for N IDs)
|
|
// - All-or-nothing semantics (rolls back on any error)
|
|
// - 5-15x faster than sequential CreateIssue calls
|
|
//
|
|
// All issues are validated before any database changes occur. If any issue
|
|
// fails validation, the entire batch is rejected.
|
|
//
|
|
// ID Assignment:
|
|
// - Issues with empty ID get auto-generated IDs from a reserved range
|
|
// - Issues with explicit IDs use those IDs (caller must ensure uniqueness)
|
|
// - Mix of explicit and auto-generated IDs is supported
|
|
//
|
|
// Timestamps:
|
|
// - All issues in the batch receive identical created_at/updated_at timestamps
|
|
// - This reflects that they were created as a single atomic operation
|
|
//
|
|
// Usage:
|
|
// // Bulk import from external source
|
|
// issues := []*types.Issue{...}
|
|
// if err := store.CreateIssues(ctx, issues, "import"); err != nil {
|
|
// return err
|
|
// }
|
|
//
|
|
// // After importing with explicit IDs, sync counters to prevent collisions
|
|
// REMOVED: SyncAllCounters example - no longer needed with hash IDs
|
|
//
|
|
// Performance:
|
|
// - 100 issues: ~30ms (vs ~900ms with CreateIssue loop)
|
|
// - 1000 issues: ~950ms (vs estimated 9s with CreateIssue loop)
|
|
//
|
|
// When to use:
|
|
// - Bulk imports from external systems (use CreateIssues)
|
|
// - Creating multiple related issues at once (use CreateIssues)
|
|
// - Single issue creation (use CreateIssue for simplicity)
|
|
// - Interactive user operations (use CreateIssue)
|
|
func (s *SQLiteStorage) CreateIssues(ctx context.Context, issues []*types.Issue, actor string) error {
|
|
// Default to OrphanResurrect for backward compatibility
|
|
return s.CreateIssuesWithOptions(ctx, issues, actor, OrphanResurrect)
|
|
}
|
|
|
|
// BatchCreateOptions contains options for batch issue creation
|
|
type BatchCreateOptions struct {
|
|
OrphanHandling OrphanHandling // How to handle missing parent issues
|
|
SkipPrefixValidation bool // Skip prefix validation for existing IDs (used during import)
|
|
}
|
|
|
|
// CreateIssuesWithOptions creates multiple issues with configurable orphan handling
|
|
func (s *SQLiteStorage) CreateIssuesWithOptions(ctx context.Context, issues []*types.Issue, actor string, orphanHandling OrphanHandling) error {
|
|
return s.CreateIssuesWithFullOptions(ctx, issues, actor, BatchCreateOptions{
|
|
OrphanHandling: orphanHandling,
|
|
SkipPrefixValidation: false,
|
|
})
|
|
}
|
|
|
|
// CreateIssuesWithFullOptions creates multiple issues with full options control
|
|
func (s *SQLiteStorage) CreateIssuesWithFullOptions(ctx context.Context, issues []*types.Issue, actor string, opts BatchCreateOptions) error {
|
|
if len(issues) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Fetch custom statuses and types for validation
|
|
customStatuses, err := s.GetCustomStatuses(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get custom statuses: %w", err)
|
|
}
|
|
customTypes, err := s.GetCustomTypes(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get custom types: %w", err)
|
|
}
|
|
|
|
// Phase 1: Validate all issues first (fail-fast, with custom status and type support)
|
|
if err := validateBatchIssuesWithCustom(issues, customStatuses, customTypes); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Phase 2: Acquire connection and start transaction
|
|
conn, err := s.db.Conn(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to acquire connection: %w", err)
|
|
}
|
|
defer func() { _ = conn.Close() }()
|
|
|
|
// Use retry logic with exponential backoff to handle SQLITE_BUSY under concurrent load
|
|
if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil {
|
|
return fmt.Errorf("failed to begin immediate transaction: %w", err)
|
|
}
|
|
|
|
committed := false
|
|
defer func() {
|
|
if !committed {
|
|
_, _ = conn.ExecContext(context.Background(), "ROLLBACK")
|
|
}
|
|
}()
|
|
|
|
// Phase 3: Generate IDs for issues that need them
|
|
if err := s.generateBatchIDs(ctx, conn, issues, actor, opts.OrphanHandling, opts.SkipPrefixValidation); err != nil {
|
|
return wrapDBError("generate batch IDs", err)
|
|
}
|
|
|
|
// Phase 3.5: Check for conflicts with existing IDs in database
|
|
if err := checkForExistingIDs(ctx, conn, issues); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Phase 4: Bulk insert issues
|
|
if err := bulkInsertIssues(ctx, conn, issues); err != nil {
|
|
return wrapDBError("bulk insert issues", err)
|
|
}
|
|
|
|
// Phase 4.5: Update child counters for hierarchical IDs (GH#728 fix)
|
|
// This must happen AFTER insert so parents exist for the foreign key constraint
|
|
if err := updateChildCountersForHierarchicalIDs(ctx, conn, issues); err != nil {
|
|
return wrapDBError("update child counters", err)
|
|
}
|
|
|
|
// Phase 5: Record creation events
|
|
if err := bulkRecordEvents(ctx, conn, issues, actor); err != nil {
|
|
return wrapDBError("record creation events", err)
|
|
}
|
|
|
|
// Phase 6: Mark issues dirty for incremental export
|
|
if err := bulkMarkDirty(ctx, conn, issues); err != nil {
|
|
return wrapDBError("mark issues dirty", err)
|
|
}
|
|
|
|
// Phase 7: Commit transaction
|
|
if _, err := conn.ExecContext(ctx, "COMMIT"); err != nil {
|
|
return fmt.Errorf("failed to commit transaction: %w", err)
|
|
}
|
|
committed = true
|
|
return nil
|
|
}
|