diff --git a/internal/storage/sqlite/sqlite.go b/internal/storage/sqlite/sqlite.go index a406b0a9..0529d234 100644 --- a/internal/storage/sqlite/sqlite.go +++ b/internal/storage/sqlite/sqlite.go @@ -579,6 +579,157 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act return nil } +// validateBatchIssues validates all issues in a batch and sets timestamps +func validateBatchIssues(issues []*types.Issue) error { + now := time.Now() + for i, issue := range issues { + if issue == nil { + return fmt.Errorf("issue %d is nil", i) + } + + issue.CreatedAt = now + issue.UpdatedAt = now + + if err := issue.Validate(); err != nil { + return fmt.Errorf("validation failed for issue %d: %w", i, err) + } + } + return nil +} + +// generateBatchIDs generates IDs for all issues that need them atomically +func generateBatchIDs(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error { + // Count how many issues need IDs + needIDCount := 0 + for _, issue := range issues { + if issue.ID == "" { + needIDCount++ + } + } + + if needIDCount == 0 { + return nil + } + + // Get prefix from config + var prefix string + err := conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix) + if err == sql.ErrNoRows || prefix == "" { + prefix = "bd" + } else if err != nil { + return fmt.Errorf("failed to get config: %w", err) + } + + // Atomically reserve ID range + var nextID int + err = conn.QueryRowContext(ctx, ` + INSERT INTO issue_counters (prefix, last_id) + SELECT ?, COALESCE(MAX(CAST(substr(id, LENGTH(?) + 2) AS INTEGER)), 0) + ? + FROM issues + WHERE id LIKE ? || '-%' + AND substr(id, LENGTH(?) + 2) GLOB '[0-9]*' + ON CONFLICT(prefix) DO UPDATE SET + last_id = MAX( + last_id, + (SELECT COALESCE(MAX(CAST(substr(id, LENGTH(?) + 2) AS INTEGER)), 0) + FROM issues + WHERE id LIKE ? || '-%' + AND substr(id, LENGTH(?) + 2) GLOB '[0-9]*') + ) + ? + RETURNING last_id + `, prefix, prefix, needIDCount, prefix, prefix, prefix, prefix, prefix, needIDCount).Scan(&nextID) + if err != nil { + return fmt.Errorf("failed to generate ID range: %w", err) + } + + // Assign IDs sequentially from the reserved range + currentID := nextID - needIDCount + 1 + for i := range issues { + if issues[i].ID == "" { + issues[i].ID = fmt.Sprintf("%s-%d", prefix, currentID) + currentID++ + } + } + return nil +} + +// bulkInsertIssues inserts all issues using a prepared statement +func bulkInsertIssues(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error { + stmt, err := conn.PrepareContext(ctx, ` + INSERT INTO issues ( + id, title, description, design, acceptance_criteria, notes, + status, priority, issue_type, assignee, estimated_minutes, + created_at, updated_at, closed_at, external_ref + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + for _, issue := range issues { + _, err = stmt.ExecContext(ctx, + issue.ID, issue.Title, issue.Description, issue.Design, + issue.AcceptanceCriteria, issue.Notes, issue.Status, + issue.Priority, issue.IssueType, issue.Assignee, + issue.EstimatedMinutes, issue.CreatedAt, issue.UpdatedAt, + issue.ClosedAt, issue.ExternalRef, + ) + if err != nil { + return fmt.Errorf("failed to insert issue %s: %w", issue.ID, err) + } + } + return nil +} + +// bulkRecordEvents records creation events for all issues +func bulkRecordEvents(ctx context.Context, conn *sql.Conn, issues []*types.Issue, actor string) error { + stmt, err := conn.PrepareContext(ctx, ` + INSERT INTO events (issue_id, event_type, actor, new_value) + VALUES (?, ?, ?, ?) + `) + if err != nil { + return fmt.Errorf("failed to prepare event statement: %w", err) + } + defer stmt.Close() + + for _, issue := range issues { + eventData, err := json.Marshal(issue) + if err != nil { + // Fall back to minimal description if marshaling fails + eventData = []byte(fmt.Sprintf(`{"id":"%s","title":"%s"}`, issue.ID, issue.Title)) + } + + _, err = stmt.ExecContext(ctx, issue.ID, types.EventCreated, actor, string(eventData)) + if err != nil { + return fmt.Errorf("failed to record event for %s: %w", issue.ID, err) + } + } + return nil +} + +// bulkMarkDirty marks all issues as dirty for incremental export +func bulkMarkDirty(ctx context.Context, conn *sql.Conn, issues []*types.Issue) error { + stmt, err := conn.PrepareContext(ctx, ` + INSERT INTO dirty_issues (issue_id, marked_at) + VALUES (?, ?) + ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at + `) + if err != nil { + return fmt.Errorf("failed to prepare dirty statement: %w", err) + } + defer stmt.Close() + + dirtyTime := time.Now() + for _, issue := range issues { + _, err = stmt.ExecContext(ctx, issue.ID, dirtyTime) + if err != nil { + return fmt.Errorf("failed to mark dirty %s: %w", issue.ID, err) + } + } + return nil +} + // CreateIssues creates multiple issues atomically in a single transaction. // This provides significant performance improvements over calling CreateIssue in a loop: // - Single connection acquisition @@ -630,34 +781,22 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act // - Single issue creation (use CreateIssue for simplicity) // - Interactive user operations (use CreateIssue) func (s *SQLiteStorage) CreateIssues(ctx context.Context, issues []*types.Issue, actor string) error { - // Handle empty batch if len(issues) == 0 { return nil } - // Phase 1: Check for nil and validate all issues first (fail-fast) - now := time.Now() - for i, issue := range issues { - if issue == nil { - return fmt.Errorf("issue %d is nil", i) - } - - issue.CreatedAt = now - issue.UpdatedAt = now - - if err := issue.Validate(); err != nil { - return fmt.Errorf("validation failed for issue %d: %w", i, err) - } + // Phase 1: Validate all issues first (fail-fast) + if err := validateBatchIssues(issues); err != nil { + return err } - // Phase 2: Acquire dedicated connection and start transaction + // Phase 2: Acquire connection and start transaction conn, err := s.db.Conn(ctx) if err != nil { return fmt.Errorf("failed to acquire connection: %w", err) } defer conn.Close() - // Begin IMMEDIATE transaction to acquire write lock early if _, err := conn.ExecContext(ctx, "BEGIN IMMEDIATE"); err != nil { return fmt.Errorf("failed to begin immediate transaction: %w", err) } @@ -665,129 +804,28 @@ func (s *SQLiteStorage) CreateIssues(ctx context.Context, issues []*types.Issue, committed := false defer func() { if !committed { - conn.ExecContext(context.Background(), "ROLLBACK") + _, _ = conn.ExecContext(context.Background(), "ROLLBACK") } }() - // Phase 3: Batch ID generation - // Count how many issues need IDs - needIDCount := 0 - for _, issue := range issues { - if issue.ID == "" { - needIDCount++ - } + // Phase 3: Generate IDs for issues that need them + if err := generateBatchIDs(ctx, conn, issues); err != nil { + return err } - // Generate ID range atomically if needed - if needIDCount > 0 { - // Get prefix from config - var prefix string - err := conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix) - if err == sql.ErrNoRows || prefix == "" { - prefix = "bd" - } else if err != nil { - return fmt.Errorf("failed to get config: %w", err) - } - - // Atomically reserve ID range: [nextID-needIDCount+1, nextID] - // This is the key optimization - one counter update instead of N - var nextID int - err = conn.QueryRowContext(ctx, ` - INSERT INTO issue_counters (prefix, last_id) - SELECT ?, COALESCE(MAX(CAST(substr(id, LENGTH(?) + 2) AS INTEGER)), 0) + ? - FROM issues - WHERE id LIKE ? || '-%' - AND substr(id, LENGTH(?) + 2) GLOB '[0-9]*' - ON CONFLICT(prefix) DO UPDATE SET - last_id = MAX( - last_id, - (SELECT COALESCE(MAX(CAST(substr(id, LENGTH(?) + 2) AS INTEGER)), 0) - FROM issues - WHERE id LIKE ? || '-%' - AND substr(id, LENGTH(?) + 2) GLOB '[0-9]*') - ) + ? - RETURNING last_id - `, prefix, prefix, needIDCount, prefix, prefix, prefix, prefix, prefix, needIDCount).Scan(&nextID) - if err != nil { - return fmt.Errorf("failed to generate ID range: %w", err) - } - - // Assign IDs sequentially from the reserved range - currentID := nextID - needIDCount + 1 - for i := range issues { - if issues[i].ID == "" { - issues[i].ID = fmt.Sprintf("%s-%d", prefix, currentID) - currentID++ - } - } + // Phase 4: Bulk insert issues + if err := bulkInsertIssues(ctx, conn, issues); err != nil { + return err } - // Phase 4: Bulk insert issues using prepared statement - stmt, err := conn.PrepareContext(ctx, ` - INSERT INTO issues ( - id, title, description, design, acceptance_criteria, notes, - status, priority, issue_type, assignee, estimated_minutes, - created_at, updated_at, closed_at, external_ref - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - return fmt.Errorf("failed to prepare statement: %w", err) - } - defer stmt.Close() - - for _, issue := range issues { - _, err = stmt.ExecContext(ctx, - issue.ID, issue.Title, issue.Description, issue.Design, - issue.AcceptanceCriteria, issue.Notes, issue.Status, - issue.Priority, issue.IssueType, issue.Assignee, - issue.EstimatedMinutes, issue.CreatedAt, issue.UpdatedAt, - issue.ClosedAt, issue.ExternalRef, - ) - if err != nil { - return fmt.Errorf("failed to insert issue %s: %w", issue.ID, err) - } + // Phase 5: Record creation events + if err := bulkRecordEvents(ctx, conn, issues, actor); err != nil { + return err } - // Phase 5: Bulk record creation events - eventStmt, err := conn.PrepareContext(ctx, ` - INSERT INTO events (issue_id, event_type, actor, new_value) - VALUES (?, ?, ?, ?) - `) - if err != nil { - return fmt.Errorf("failed to prepare event statement: %w", err) - } - defer eventStmt.Close() - - for _, issue := range issues { - eventData, err := json.Marshal(issue) - if err != nil { - // Fall back to minimal description if marshaling fails - eventData = []byte(fmt.Sprintf(`{"id":"%s","title":"%s"}`, issue.ID, issue.Title)) - } - - _, err = eventStmt.ExecContext(ctx, issue.ID, types.EventCreated, actor, string(eventData)) - if err != nil { - return fmt.Errorf("failed to record event for %s: %w", issue.ID, err) - } - } - - // Phase 6: Bulk mark dirty for incremental export - dirtyStmt, err := conn.PrepareContext(ctx, ` - INSERT INTO dirty_issues (issue_id, marked_at) - VALUES (?, ?) - ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at - `) - if err != nil { - return fmt.Errorf("failed to prepare dirty statement: %w", err) - } - defer dirtyStmt.Close() - - dirtyTime := time.Now() - for _, issue := range issues { - _, err = dirtyStmt.ExecContext(ctx, issue.ID, dirtyTime) - if err != nil { - return fmt.Errorf("failed to mark dirty %s: %w", issue.ID, err) - } + // Phase 6: Mark issues dirty for incremental export + if err := bulkMarkDirty(ctx, conn, issues); err != nil { + return err } // Phase 7: Commit transaction