refactor(sqlite): use withTx helper for remaining transaction entry points (#1276)
Refactors UpdateIssue, CloseIssue, CreateTombstone, and DeleteIssues to use the withTx helper with BEGIN IMMEDIATE instead of BeginTx. This completes the GH#1272 fix by ensuring all write transactions acquire write locks early, preventing deadlocks. Changes: - UpdateIssue: now uses withTx and markDirty helper - CloseIssue: now uses withTx and markDirty helper - CreateTombstone: now uses withTx and markDirty helper - DeleteIssues: now uses withTx with dbExecutor interface - Helper functions (resolveDeleteSet, expandWithDependents, validateNoDependents, etc.) changed from *sql.Tx to dbExecutor This is a P3 follow-up to the P1 sqlite lock fix (PR #1274). Closes: bd-fgzp Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -996,21 +996,7 @@ func (s *SQLiteStorage) UpdateIssue(ctx context.Context, id string, updates map[
|
|||||||
|
|
||||||
args = append(args, id)
|
args = append(args, id)
|
||||||
|
|
||||||
// Start transaction
|
// Prepare event data before transaction
|
||||||
tx, err := s.db.BeginTx(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = tx.Rollback() }()
|
|
||||||
|
|
||||||
// Update issue
|
|
||||||
query := fmt.Sprintf("UPDATE issues SET %s WHERE id = ?", strings.Join(setClauses, ", ")) // #nosec G201 - safe SQL with controlled column names
|
|
||||||
_, err = tx.ExecContext(ctx, query, args...)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to update issue: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Record event
|
|
||||||
oldData, err := json.Marshal(oldIssue)
|
oldData, err := json.Marshal(oldIssue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Fall back to minimal description if marshaling fails
|
// Fall back to minimal description if marshaling fails
|
||||||
@@ -1023,38 +1009,47 @@ func (s *SQLiteStorage) UpdateIssue(ctx context.Context, id string, updates map[
|
|||||||
}
|
}
|
||||||
oldDataStr := string(oldData)
|
oldDataStr := string(oldData)
|
||||||
newDataStr := string(newData)
|
newDataStr := string(newData)
|
||||||
|
|
||||||
eventType := determineEventType(oldIssue, updates)
|
eventType := determineEventType(oldIssue, updates)
|
||||||
|
statusChanged := false
|
||||||
_, err = tx.ExecContext(ctx, `
|
if _, ok := updates["status"]; ok {
|
||||||
INSERT INTO events (issue_id, event_type, actor, old_value, new_value)
|
statusChanged = true
|
||||||
VALUES (?, ?, ?, ?, ?)
|
|
||||||
`, id, eventType, actor, oldDataStr, newDataStr)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to record event: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: Graph edges now managed via AddDependency() per Decision 004 Phase 4.
|
// Execute in transaction using BEGIN IMMEDIATE (GH#1272 fix)
|
||||||
|
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||||
// Mark issue as dirty for incremental export
|
// Update issue
|
||||||
_, err = tx.ExecContext(ctx, `
|
query := fmt.Sprintf("UPDATE issues SET %s WHERE id = ?", strings.Join(setClauses, ", ")) // #nosec G201 - safe SQL with controlled column names
|
||||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
_, err := conn.ExecContext(ctx, query, args...)
|
||||||
VALUES (?, ?)
|
if err != nil {
|
||||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
return fmt.Errorf("failed to update issue: %w", err)
|
||||||
`, id, time.Now())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Invalidate blocked issues cache if status changed
|
|
||||||
// Status changes affect which issues are blocked (blockers must be open/in_progress/blocked)
|
|
||||||
if _, statusChanged := updates["status"]; statusChanged {
|
|
||||||
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
|
|
||||||
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Commit()
|
// Record event
|
||||||
|
_, err = conn.ExecContext(ctx, `
|
||||||
|
INSERT INTO events (issue_id, event_type, actor, old_value, new_value)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
`, id, eventType, actor, oldDataStr, newDataStr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to record event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: Graph edges now managed via AddDependency() per Decision 004 Phase 4.
|
||||||
|
|
||||||
|
// Mark issue as dirty for incremental export
|
||||||
|
if err := markDirty(ctx, conn, id); err != nil {
|
||||||
|
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalidate blocked issues cache if status changed
|
||||||
|
// Status changes affect which issues are blocked (blockers must be open/in_progress/blocked)
|
||||||
|
if statusChanged {
|
||||||
|
if err := s.invalidateBlockedCache(ctx, conn); err != nil {
|
||||||
|
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateIssueID updates an issue ID and all its text fields in a single transaction
|
// UpdateIssueID updates an issue ID and all its text fields in a single transaction
|
||||||
@@ -1208,136 +1203,122 @@ func (s *SQLiteStorage) ResetCounter(ctx context.Context, prefix string) error {
|
|||||||
func (s *SQLiteStorage) CloseIssue(ctx context.Context, id string, reason string, actor string, session string) error {
|
func (s *SQLiteStorage) CloseIssue(ctx context.Context, id string, reason string, actor string, session string) error {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// Update with special event handling
|
// Execute in transaction using BEGIN IMMEDIATE (GH#1272 fix)
|
||||||
tx, err := s.db.BeginTx(ctx, nil)
|
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||||
if err != nil {
|
// NOTE: close_reason is stored in two places:
|
||||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
// 1. issues.close_reason - for direct queries (bd show --json, exports)
|
||||||
}
|
// 2. events.comment - for audit history (when was it closed, by whom)
|
||||||
defer func() { _ = tx.Rollback() }()
|
// Keep both in sync. If refactoring, consider deriving one from the other.
|
||||||
|
result, err := conn.ExecContext(ctx, `
|
||||||
// NOTE: close_reason is stored in two places:
|
UPDATE issues SET status = ?, closed_at = ?, updated_at = ?, close_reason = ?, closed_by_session = ?
|
||||||
// 1. issues.close_reason - for direct queries (bd show --json, exports)
|
WHERE id = ?
|
||||||
// 2. events.comment - for audit history (when was it closed, by whom)
|
`, types.StatusClosed, now, now, reason, session, id)
|
||||||
// Keep both in sync. If refactoring, consider deriving one from the other.
|
if err != nil {
|
||||||
result, err := tx.ExecContext(ctx, `
|
return fmt.Errorf("failed to close issue: %w", err)
|
||||||
UPDATE issues SET status = ?, closed_at = ?, updated_at = ?, close_reason = ?, closed_by_session = ?
|
|
||||||
WHERE id = ?
|
|
||||||
`, types.StatusClosed, now, now, reason, session, id)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to close issue: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rows, err := result.RowsAffected()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to get rows affected: %w", err)
|
|
||||||
}
|
|
||||||
if rows == 0 {
|
|
||||||
return fmt.Errorf("issue not found: %s", id)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = tx.ExecContext(ctx, `
|
|
||||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
|
||||||
VALUES (?, ?, ?, ?)
|
|
||||||
`, id, types.EventClosed, actor, reason)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to record event: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark issue as dirty for incremental export
|
|
||||||
_, err = tx.ExecContext(ctx, `
|
|
||||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
|
||||||
VALUES (?, ?)
|
|
||||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
|
||||||
`, id, time.Now())
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Invalidate blocked issues cache since status changed to closed
|
|
||||||
// Closed issues don't block others, so this affects blocking calculations
|
|
||||||
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
|
|
||||||
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reactive convoy completion: check if any convoys tracking this issue should auto-close
|
|
||||||
// Find convoys that track this issue (convoy.issue_id tracks closed_issue.depends_on_id)
|
|
||||||
// Uses gt:convoy label instead of issue_type for Gas Town separation
|
|
||||||
convoyRows, err := tx.QueryContext(ctx, `
|
|
||||||
SELECT DISTINCT d.issue_id
|
|
||||||
FROM dependencies d
|
|
||||||
JOIN issues i ON d.issue_id = i.id
|
|
||||||
JOIN labels l ON i.id = l.issue_id AND l.label = 'gt:convoy'
|
|
||||||
WHERE d.depends_on_id = ?
|
|
||||||
AND d.type = ?
|
|
||||||
AND i.status != ?
|
|
||||||
`, id, types.DepTracks, types.StatusClosed)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to find tracking convoys: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = convoyRows.Close() }()
|
|
||||||
|
|
||||||
var convoyIDs []string
|
|
||||||
for convoyRows.Next() {
|
|
||||||
var convoyID string
|
|
||||||
if err := convoyRows.Scan(&convoyID); err != nil {
|
|
||||||
return fmt.Errorf("failed to scan convoy ID: %w", err)
|
|
||||||
}
|
}
|
||||||
convoyIDs = append(convoyIDs, convoyID)
|
|
||||||
}
|
|
||||||
if err := convoyRows.Err(); err != nil {
|
|
||||||
return fmt.Errorf("convoy rows iteration error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// For each convoy, check if all tracked issues are now closed
|
rows, err := result.RowsAffected()
|
||||||
for _, convoyID := range convoyIDs {
|
if err != nil {
|
||||||
// Count non-closed tracked issues for this convoy
|
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||||
var openCount int
|
}
|
||||||
err := tx.QueryRowContext(ctx, `
|
if rows == 0 {
|
||||||
SELECT COUNT(*)
|
return fmt.Errorf("issue not found: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = conn.ExecContext(ctx, `
|
||||||
|
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
`, id, types.EventClosed, actor, reason)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to record event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark issue as dirty for incremental export
|
||||||
|
if err := markDirty(ctx, conn, id); err != nil {
|
||||||
|
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Invalidate blocked issues cache since status changed to closed
|
||||||
|
// Closed issues don't block others, so this affects blocking calculations
|
||||||
|
if err := s.invalidateBlockedCache(ctx, conn); err != nil {
|
||||||
|
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reactive convoy completion: check if any convoys tracking this issue should auto-close
|
||||||
|
// Find convoys that track this issue (convoy.issue_id tracks closed_issue.depends_on_id)
|
||||||
|
// Uses gt:convoy label instead of issue_type for Gas Town separation
|
||||||
|
convoyRows, err := conn.QueryContext(ctx, `
|
||||||
|
SELECT DISTINCT d.issue_id
|
||||||
FROM dependencies d
|
FROM dependencies d
|
||||||
JOIN issues i ON d.depends_on_id = i.id
|
JOIN issues i ON d.issue_id = i.id
|
||||||
WHERE d.issue_id = ?
|
JOIN labels l ON i.id = l.issue_id AND l.label = 'gt:convoy'
|
||||||
|
WHERE d.depends_on_id = ?
|
||||||
AND d.type = ?
|
AND d.type = ?
|
||||||
AND i.status != ?
|
AND i.status != ?
|
||||||
AND i.status != ?
|
`, id, types.DepTracks, types.StatusClosed)
|
||||||
`, convoyID, types.DepTracks, types.StatusClosed, types.StatusTombstone).Scan(&openCount)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to count open tracked issues for convoy %s: %w", convoyID, err)
|
return fmt.Errorf("failed to find tracking convoys: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = convoyRows.Close() }()
|
||||||
|
|
||||||
|
var convoyIDs []string
|
||||||
|
for convoyRows.Next() {
|
||||||
|
var convoyID string
|
||||||
|
if err := convoyRows.Scan(&convoyID); err != nil {
|
||||||
|
return fmt.Errorf("failed to scan convoy ID: %w", err)
|
||||||
|
}
|
||||||
|
convoyIDs = append(convoyIDs, convoyID)
|
||||||
|
}
|
||||||
|
if err := convoyRows.Err(); err != nil {
|
||||||
|
return fmt.Errorf("convoy rows iteration error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all tracked issues are closed, auto-close the convoy
|
// For each convoy, check if all tracked issues are now closed
|
||||||
if openCount == 0 {
|
for _, convoyID := range convoyIDs {
|
||||||
closeReason := "All tracked issues completed"
|
// Count non-closed tracked issues for this convoy
|
||||||
_, err := tx.ExecContext(ctx, `
|
var openCount int
|
||||||
UPDATE issues SET status = ?, closed_at = ?, updated_at = ?, close_reason = ?
|
err := conn.QueryRowContext(ctx, `
|
||||||
WHERE id = ?
|
SELECT COUNT(*)
|
||||||
`, types.StatusClosed, now, now, closeReason, convoyID)
|
FROM dependencies d
|
||||||
|
JOIN issues i ON d.depends_on_id = i.id
|
||||||
|
WHERE d.issue_id = ?
|
||||||
|
AND d.type = ?
|
||||||
|
AND i.status != ?
|
||||||
|
AND i.status != ?
|
||||||
|
`, convoyID, types.DepTracks, types.StatusClosed, types.StatusTombstone).Scan(&openCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to auto-close convoy %s: %w", convoyID, err)
|
return fmt.Errorf("failed to count open tracked issues for convoy %s: %w", convoyID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the close event
|
// If all tracked issues are closed, auto-close the convoy
|
||||||
_, err = tx.ExecContext(ctx, `
|
if openCount == 0 {
|
||||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
closeReason := "All tracked issues completed"
|
||||||
VALUES (?, ?, ?, ?)
|
_, err := conn.ExecContext(ctx, `
|
||||||
`, convoyID, types.EventClosed, "system:convoy-completion", closeReason)
|
UPDATE issues SET status = ?, closed_at = ?, updated_at = ?, close_reason = ?
|
||||||
if err != nil {
|
WHERE id = ?
|
||||||
return fmt.Errorf("failed to record convoy close event: %w", err)
|
`, types.StatusClosed, now, now, closeReason, convoyID)
|
||||||
}
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to auto-close convoy %s: %w", convoyID, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Mark convoy as dirty
|
// Record the close event
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = conn.ExecContext(ctx, `
|
||||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||||
VALUES (?, ?)
|
VALUES (?, ?, ?, ?)
|
||||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
`, convoyID, types.EventClosed, "system:convoy-completion", closeReason)
|
||||||
`, convoyID, now)
|
if err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("failed to record convoy close event: %w", err)
|
||||||
return fmt.Errorf("failed to mark convoy dirty: %w", err)
|
}
|
||||||
|
|
||||||
|
// Mark convoy as dirty
|
||||||
|
if err := markDirty(ctx, conn, convoyID); err != nil {
|
||||||
|
return fmt.Errorf("failed to mark convoy dirty: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Commit()
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateTombstone converts an existing issue to a tombstone record.
|
// CreateTombstone converts an existing issue to a tombstone record.
|
||||||
@@ -1354,63 +1335,51 @@ func (s *SQLiteStorage) CreateTombstone(ctx context.Context, id string, actor st
|
|||||||
return fmt.Errorf("issue not found: %s", id)
|
return fmt.Errorf("issue not found: %s", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := s.db.BeginTx(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = tx.Rollback() }()
|
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
originalType := string(issue.IssueType)
|
originalType := string(issue.IssueType)
|
||||||
|
|
||||||
// Convert issue to tombstone
|
// Execute in transaction using BEGIN IMMEDIATE (GH#1272 fix)
|
||||||
// Note: closed_at must be set to NULL because of CHECK constraint:
|
return s.withTx(ctx, func(conn *sql.Conn) error {
|
||||||
// (status = 'closed') = (closed_at IS NOT NULL)
|
// Convert issue to tombstone
|
||||||
_, err = tx.ExecContext(ctx, `
|
// Note: closed_at must be set to NULL because of CHECK constraint:
|
||||||
UPDATE issues
|
// (status = 'closed') = (closed_at IS NOT NULL)
|
||||||
SET status = ?,
|
_, err := conn.ExecContext(ctx, `
|
||||||
closed_at = NULL,
|
UPDATE issues
|
||||||
deleted_at = ?,
|
SET status = ?,
|
||||||
deleted_by = ?,
|
closed_at = NULL,
|
||||||
delete_reason = ?,
|
deleted_at = ?,
|
||||||
original_type = ?,
|
deleted_by = ?,
|
||||||
updated_at = ?
|
delete_reason = ?,
|
||||||
WHERE id = ?
|
original_type = ?,
|
||||||
`, types.StatusTombstone, now, actor, reason, originalType, now, id)
|
updated_at = ?
|
||||||
if err != nil {
|
WHERE id = ?
|
||||||
return fmt.Errorf("failed to create tombstone: %w", err)
|
`, types.StatusTombstone, now, actor, reason, originalType, now, id)
|
||||||
}
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create tombstone: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Record tombstone creation event
|
// Record tombstone creation event
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = conn.ExecContext(ctx, `
|
||||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||||
VALUES (?, ?, ?, ?)
|
VALUES (?, ?, ?, ?)
|
||||||
`, id, "deleted", actor, reason)
|
`, id, "deleted", actor, reason)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to record tombstone event: %w", err)
|
return fmt.Errorf("failed to record tombstone event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark issue as dirty for incremental export
|
// Mark issue as dirty for incremental export
|
||||||
_, err = tx.ExecContext(ctx, `
|
if err := markDirty(ctx, conn, id); err != nil {
|
||||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||||
VALUES (?, ?)
|
}
|
||||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
|
||||||
`, id, now)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Invalidate blocked issues cache since status changed
|
// Invalidate blocked issues cache since status changed
|
||||||
// Tombstone issues don't block others, so this affects blocking calculations
|
// Tombstone issues don't block others, so this affects blocking calculations
|
||||||
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
|
if err := s.invalidateBlockedCache(ctx, conn); err != nil {
|
||||||
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
return nil
|
||||||
return wrapDBError("commit tombstone transaction", err)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteIssue permanently removes an issue from the database
|
// DeleteIssue permanently removes an issue from the database
|
||||||
@@ -1503,37 +1472,36 @@ func (s *SQLiteStorage) DeleteIssues(ctx context.Context, ids []string, cascade
|
|||||||
return &DeleteIssuesResult{}, nil
|
return &DeleteIssuesResult{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := s.db.BeginTx(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to begin transaction: %w", err)
|
|
||||||
}
|
|
||||||
defer func() { _ = tx.Rollback() }()
|
|
||||||
|
|
||||||
idSet := buildIDSet(ids)
|
idSet := buildIDSet(ids)
|
||||||
result := &DeleteIssuesResult{}
|
result := &DeleteIssuesResult{}
|
||||||
|
|
||||||
expandedIDs, err := s.resolveDeleteSet(ctx, tx, ids, idSet, cascade, force, result)
|
// Execute in transaction using BEGIN IMMEDIATE (GH#1272 fix)
|
||||||
|
err := s.withTx(ctx, func(conn *sql.Conn) error {
|
||||||
|
expandedIDs, err := s.resolveDeleteSet(ctx, conn, ids, idSet, cascade, force, result)
|
||||||
|
if err != nil {
|
||||||
|
return wrapDBError("resolve delete set", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
inClause, args := buildSQLInClause(expandedIDs)
|
||||||
|
if err := s.populateDeleteStats(ctx, conn, inClause, args, result); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if dryRun {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.executeDelete(ctx, conn, inClause, args, result); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, wrapDBError("resolve delete set", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
inClause, args := buildSQLInClause(expandedIDs)
|
|
||||||
if err := s.populateDeleteStats(ctx, tx, inClause, args, result); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if dryRun {
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.executeDelete(ctx, tx, inClause, args, result); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to commit transaction: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// REMOVED: Counter sync after deletion - no longer needed with hash IDs
|
// REMOVED: Counter sync after deletion - no longer needed with hash IDs
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
@@ -1547,18 +1515,18 @@ func buildIDSet(ids []string) map[string]bool {
|
|||||||
return idSet
|
return idSet
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) resolveDeleteSet(ctx context.Context, tx *sql.Tx, ids []string, idSet map[string]bool, cascade bool, force bool, result *DeleteIssuesResult) ([]string, error) {
|
func (s *SQLiteStorage) resolveDeleteSet(ctx context.Context, exec dbExecutor, ids []string, idSet map[string]bool, cascade bool, force bool, result *DeleteIssuesResult) ([]string, error) {
|
||||||
if cascade {
|
if cascade {
|
||||||
return s.expandWithDependents(ctx, tx, ids, idSet)
|
return s.expandWithDependents(ctx, exec, ids, idSet)
|
||||||
}
|
}
|
||||||
if !force {
|
if !force {
|
||||||
return ids, s.validateNoDependents(ctx, tx, ids, idSet, result)
|
return ids, s.validateNoDependents(ctx, exec, ids, idSet, result)
|
||||||
}
|
}
|
||||||
return ids, s.trackOrphanedIssues(ctx, tx, ids, idSet, result)
|
return ids, s.trackOrphanedIssues(ctx, exec, ids, idSet, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) expandWithDependents(ctx context.Context, tx *sql.Tx, ids []string, _ map[string]bool) ([]string, error) {
|
func (s *SQLiteStorage) expandWithDependents(ctx context.Context, exec dbExecutor, ids []string, _ map[string]bool) ([]string, error) {
|
||||||
allToDelete, err := s.findAllDependentsRecursive(ctx, tx, ids)
|
allToDelete, err := s.findAllDependentsRecursive(ctx, exec, ids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to find dependents: %w", err)
|
return nil, fmt.Errorf("failed to find dependents: %w", err)
|
||||||
}
|
}
|
||||||
@@ -1569,18 +1537,18 @@ func (s *SQLiteStorage) expandWithDependents(ctx context.Context, tx *sql.Tx, id
|
|||||||
return expandedIDs, nil
|
return expandedIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) validateNoDependents(ctx context.Context, tx *sql.Tx, ids []string, idSet map[string]bool, result *DeleteIssuesResult) error {
|
func (s *SQLiteStorage) validateNoDependents(ctx context.Context, exec dbExecutor, ids []string, idSet map[string]bool, result *DeleteIssuesResult) error {
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
if err := s.checkSingleIssueValidation(ctx, tx, id, idSet, result); err != nil {
|
if err := s.checkSingleIssueValidation(ctx, exec, id, idSet, result); err != nil {
|
||||||
return wrapDBError("check dependents", err)
|
return wrapDBError("check dependents", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) checkSingleIssueValidation(ctx context.Context, tx *sql.Tx, id string, idSet map[string]bool, result *DeleteIssuesResult) error {
|
func (s *SQLiteStorage) checkSingleIssueValidation(ctx context.Context, exec dbExecutor, id string, idSet map[string]bool, result *DeleteIssuesResult) error {
|
||||||
var depCount int
|
var depCount int
|
||||||
err := tx.QueryRowContext(ctx,
|
err := exec.QueryRowContext(ctx,
|
||||||
`SELECT COUNT(*) FROM dependencies WHERE depends_on_id = ?`, id).Scan(&depCount)
|
`SELECT COUNT(*) FROM dependencies WHERE depends_on_id = ?`, id).Scan(&depCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to check dependents for %s: %w", id, err)
|
return fmt.Errorf("failed to check dependents for %s: %w", id, err)
|
||||||
@@ -1589,7 +1557,7 @@ func (s *SQLiteStorage) checkSingleIssueValidation(ctx context.Context, tx *sql.
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := tx.QueryContext(ctx,
|
rows, err := exec.QueryContext(ctx,
|
||||||
`SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
|
`SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get dependents for %s: %w", id, err)
|
return fmt.Errorf("failed to get dependents for %s: %w", id, err)
|
||||||
@@ -1618,10 +1586,10 @@ func (s *SQLiteStorage) checkSingleIssueValidation(ctx context.Context, tx *sql.
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) trackOrphanedIssues(ctx context.Context, tx *sql.Tx, ids []string, idSet map[string]bool, result *DeleteIssuesResult) error {
|
func (s *SQLiteStorage) trackOrphanedIssues(ctx context.Context, exec dbExecutor, ids []string, idSet map[string]bool, result *DeleteIssuesResult) error {
|
||||||
orphanSet := make(map[string]bool)
|
orphanSet := make(map[string]bool)
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
if err := s.collectOrphansForID(ctx, tx, id, idSet, orphanSet); err != nil {
|
if err := s.collectOrphansForID(ctx, exec, id, idSet, orphanSet); err != nil {
|
||||||
return wrapDBError("collect orphans", err)
|
return wrapDBError("collect orphans", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1631,8 +1599,8 @@ func (s *SQLiteStorage) trackOrphanedIssues(ctx context.Context, tx *sql.Tx, ids
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) collectOrphansForID(ctx context.Context, tx *sql.Tx, id string, idSet map[string]bool, orphanSet map[string]bool) error {
|
func (s *SQLiteStorage) collectOrphansForID(ctx context.Context, exec dbExecutor, id string, idSet map[string]bool, orphanSet map[string]bool) error {
|
||||||
rows, err := tx.QueryContext(ctx,
|
rows, err := exec.QueryContext(ctx,
|
||||||
`SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
|
`SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get dependents for %s: %w", id, err)
|
return fmt.Errorf("failed to get dependents for %s: %w", id, err)
|
||||||
@@ -1661,7 +1629,7 @@ func buildSQLInClause(ids []string) (string, []interface{}) {
|
|||||||
return strings.Join(placeholders, ","), args
|
return strings.Join(placeholders, ","), args
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) populateDeleteStats(ctx context.Context, tx *sql.Tx, inClause string, args []interface{}, result *DeleteIssuesResult) error {
|
func (s *SQLiteStorage) populateDeleteStats(ctx context.Context, exec dbExecutor, inClause string, args []interface{}, result *DeleteIssuesResult) error {
|
||||||
counts := []struct {
|
counts := []struct {
|
||||||
query string
|
query string
|
||||||
dest *int
|
dest *int
|
||||||
@@ -1676,7 +1644,7 @@ func (s *SQLiteStorage) populateDeleteStats(ctx context.Context, tx *sql.Tx, inC
|
|||||||
if c.dest == &result.DependenciesCount {
|
if c.dest == &result.DependenciesCount {
|
||||||
queryArgs = append(args, args...)
|
queryArgs = append(args, args...)
|
||||||
}
|
}
|
||||||
if err := tx.QueryRowContext(ctx, c.query, queryArgs...).Scan(c.dest); err != nil {
|
if err := exec.QueryRowContext(ctx, c.query, queryArgs...).Scan(c.dest); err != nil {
|
||||||
return fmt.Errorf("failed to count: %w", err)
|
return fmt.Errorf("failed to count: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1685,12 +1653,12 @@ func (s *SQLiteStorage) populateDeleteStats(ctx context.Context, tx *sql.Tx, inC
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SQLiteStorage) executeDelete(ctx context.Context, tx *sql.Tx, inClause string, args []interface{}, result *DeleteIssuesResult) error {
|
func (s *SQLiteStorage) executeDelete(ctx context.Context, exec dbExecutor, inClause string, args []interface{}, result *DeleteIssuesResult) error {
|
||||||
// Note: This method now creates tombstones instead of hard-deleting
|
// Note: This method now creates tombstones instead of hard-deleting
|
||||||
// Only dependencies are deleted - issues are converted to tombstones
|
// Only dependencies are deleted - issues are converted to tombstones
|
||||||
|
|
||||||
// 1. Delete dependencies - tombstones don't block other issues
|
// 1. Delete dependencies - tombstones don't block other issues
|
||||||
_, err := tx.ExecContext(ctx,
|
_, err := exec.ExecContext(ctx,
|
||||||
fmt.Sprintf(`DELETE FROM dependencies WHERE issue_id IN (%s) OR depends_on_id IN (%s)`, inClause, inClause),
|
fmt.Sprintf(`DELETE FROM dependencies WHERE issue_id IN (%s) OR depends_on_id IN (%s)`, inClause, inClause),
|
||||||
append(args, args...)...)
|
append(args, args...)...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1699,7 +1667,7 @@ func (s *SQLiteStorage) executeDelete(ctx context.Context, tx *sql.Tx, inClause
|
|||||||
|
|
||||||
// 2. Get issue types before converting to tombstones (need for original_type)
|
// 2. Get issue types before converting to tombstones (need for original_type)
|
||||||
issueTypes := make(map[string]string)
|
issueTypes := make(map[string]string)
|
||||||
rows, err := tx.QueryContext(ctx,
|
rows, err := exec.QueryContext(ctx,
|
||||||
fmt.Sprintf(`SELECT id, issue_type FROM issues WHERE id IN (%s)`, inClause),
|
fmt.Sprintf(`SELECT id, issue_type FROM issues WHERE id IN (%s)`, inClause),
|
||||||
args...)
|
args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1721,7 +1689,7 @@ func (s *SQLiteStorage) executeDelete(ctx context.Context, tx *sql.Tx, inClause
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
deletedCount := 0
|
deletedCount := 0
|
||||||
for id, originalType := range issueTypes {
|
for id, originalType := range issueTypes {
|
||||||
execResult, err := tx.ExecContext(ctx, `
|
execResult, err := exec.ExecContext(ctx, `
|
||||||
UPDATE issues
|
UPDATE issues
|
||||||
SET status = ?,
|
SET status = ?,
|
||||||
closed_at = NULL,
|
closed_at = NULL,
|
||||||
@@ -1743,7 +1711,7 @@ func (s *SQLiteStorage) executeDelete(ctx context.Context, tx *sql.Tx, inClause
|
|||||||
deletedCount++
|
deletedCount++
|
||||||
|
|
||||||
// Record tombstone creation event
|
// Record tombstone creation event
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = exec.ExecContext(ctx, `
|
||||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||||
VALUES (?, ?, ?, ?)
|
VALUES (?, ?, ?, ?)
|
||||||
`, id, "deleted", "batch delete", "batch delete")
|
`, id, "deleted", "batch delete", "batch delete")
|
||||||
@@ -1752,7 +1720,7 @@ func (s *SQLiteStorage) executeDelete(ctx context.Context, tx *sql.Tx, inClause
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Mark issue as dirty for incremental export
|
// Mark issue as dirty for incremental export
|
||||||
_, err = tx.ExecContext(ctx, `
|
_, err = exec.ExecContext(ctx, `
|
||||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||||
VALUES (?, ?)
|
VALUES (?, ?)
|
||||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||||
@@ -1763,7 +1731,7 @@ func (s *SQLiteStorage) executeDelete(ctx context.Context, tx *sql.Tx, inClause
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 4. Invalidate blocked issues cache since statuses changed
|
// 4. Invalidate blocked issues cache since statuses changed
|
||||||
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
|
if err := s.invalidateBlockedCache(ctx, exec); err != nil {
|
||||||
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1772,7 +1740,7 @@ func (s *SQLiteStorage) executeDelete(ctx context.Context, tx *sql.Tx, inClause
|
|||||||
}
|
}
|
||||||
|
|
||||||
// findAllDependentsRecursive finds all issues that depend on the given issues, recursively
|
// findAllDependentsRecursive finds all issues that depend on the given issues, recursively
|
||||||
func (s *SQLiteStorage) findAllDependentsRecursive(ctx context.Context, tx *sql.Tx, ids []string) (map[string]bool, error) {
|
func (s *SQLiteStorage) findAllDependentsRecursive(ctx context.Context, exec dbExecutor, ids []string) (map[string]bool, error) {
|
||||||
result := make(map[string]bool)
|
result := make(map[string]bool)
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
result[id] = true
|
result[id] = true
|
||||||
@@ -1785,7 +1753,7 @@ func (s *SQLiteStorage) findAllDependentsRecursive(ctx context.Context, tx *sql.
|
|||||||
current := toProcess[0]
|
current := toProcess[0]
|
||||||
toProcess = toProcess[1:]
|
toProcess = toProcess[1:]
|
||||||
|
|
||||||
rows, err := tx.QueryContext(ctx,
|
rows, err := exec.QueryContext(ctx,
|
||||||
`SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, current)
|
`SELECT issue_id FROM dependencies WHERE depends_on_id = ?`, current)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
Reference in New Issue
Block a user