bd sync: 2025-11-24 12:25:34
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -12,6 +12,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
@@ -1096,6 +1097,16 @@ func (m *MemoryStorage) UnderlyingConn(ctx context.Context) (*sql.Conn, error) {
|
||||
return nil, fmt.Errorf("UnderlyingConn not available in memory storage")
|
||||
}
|
||||
|
||||
// RunInTransaction executes a function within a transaction context.
|
||||
// For MemoryStorage, this provides basic atomicity via mutex locking.
|
||||
// If the function returns an error, changes are NOT automatically rolled back
|
||||
// since MemoryStorage doesn't support true transaction rollback.
|
||||
//
|
||||
// Note: For full rollback support, callers should use SQLite storage.
|
||||
func (m *MemoryStorage) RunInTransaction(ctx context.Context, fn func(tx storage.Transaction) error) error {
|
||||
return fmt.Errorf("RunInTransaction not supported in --no-db mode: use SQLite storage for transaction support")
|
||||
}
|
||||
|
||||
// REMOVED (bd-c7af): SyncAllCounters - no longer needed with hash IDs
|
||||
|
||||
// MarkIssueDirty marks an issue as dirty for export
|
||||
|
||||
758
internal/storage/sqlite/transaction.go
Normal file
758
internal/storage/sqlite/transaction.go
Normal file
@@ -0,0 +1,758 @@
|
||||
// Package sqlite implements the storage interface using SQLite.
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// Verify sqliteTxStorage implements storage.Transaction at compile time
|
||||
var _ storage.Transaction = (*sqliteTxStorage)(nil)
|
||||
|
||||
// sqliteTxStorage implements the storage.Transaction interface for SQLite.
|
||||
// It wraps a dedicated database connection with an active transaction.
|
||||
type sqliteTxStorage struct {
|
||||
conn *sql.Conn // Dedicated connection for the transaction
|
||||
parent *SQLiteStorage // Parent storage for accessing shared state
|
||||
}
|
||||
|
||||
// RunInTransaction executes a function within a database transaction.
|
||||
//
|
||||
// The transaction uses BEGIN IMMEDIATE to acquire a write lock early,
|
||||
// preventing deadlocks when multiple goroutines compete for the same lock.
|
||||
//
|
||||
// Transaction lifecycle:
|
||||
// 1. Acquire dedicated connection from pool
|
||||
// 2. Begin IMMEDIATE transaction with retry on SQLITE_BUSY
|
||||
// 3. Execute user function with Transaction interface
|
||||
// 4. On success: COMMIT
|
||||
// 5. On error or panic: ROLLBACK
|
||||
//
|
||||
// Panic safety: If the callback panics, the transaction is rolled back
|
||||
// and the panic is re-raised to the caller.
|
||||
func (s *SQLiteStorage) RunInTransaction(ctx context.Context, fn func(tx storage.Transaction) error) error {
|
||||
// Acquire a dedicated connection for the transaction.
|
||||
// This ensures all operations in the transaction use the same connection.
|
||||
conn, err := s.db.Conn(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to acquire connection for transaction: %w", err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
// Start IMMEDIATE transaction to acquire write lock early.
|
||||
// Use retry logic with exponential backoff to handle SQLITE_BUSY (bd-ola6)
|
||||
if err := beginImmediateWithRetry(ctx, conn, 5, 10*time.Millisecond); err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
|
||||
// Track commit state for cleanup
|
||||
committed := false
|
||||
defer func() {
|
||||
if !committed {
|
||||
// Use background context to ensure rollback completes even if ctx is cancelled
|
||||
_, _ = conn.ExecContext(context.Background(), "ROLLBACK")
|
||||
}
|
||||
}()
|
||||
|
||||
// Handle panics: rollback and re-raise
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
// Rollback will happen via the committed=false check above
|
||||
panic(r) // Re-raise the panic
|
||||
}
|
||||
}()
|
||||
|
||||
// Create transaction wrapper
|
||||
txStorage := &sqliteTxStorage{
|
||||
conn: conn,
|
||||
parent: s,
|
||||
}
|
||||
|
||||
// Execute user function
|
||||
if err := fn(txStorage); err != nil {
|
||||
return err // Rollback happens in defer
|
||||
}
|
||||
|
||||
// Commit the transaction
|
||||
if _, err := conn.ExecContext(ctx, "COMMIT"); err != nil {
|
||||
return fmt.Errorf("failed to commit transaction: %w", err)
|
||||
}
|
||||
committed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateIssue creates a new issue within the transaction.
|
||||
func (t *sqliteTxStorage) CreateIssue(ctx context.Context, issue *types.Issue, actor string) error {
|
||||
// Validate issue before creating
|
||||
if err := issue.Validate(); err != nil {
|
||||
return fmt.Errorf("validation failed: %w", err)
|
||||
}
|
||||
|
||||
// Set timestamps
|
||||
now := time.Now()
|
||||
issue.CreatedAt = now
|
||||
issue.UpdatedAt = now
|
||||
|
||||
// Compute content hash (bd-95)
|
||||
if issue.ContentHash == "" {
|
||||
issue.ContentHash = issue.ComputeContentHash()
|
||||
}
|
||||
|
||||
// Get prefix from config (needed for both ID generation and validation)
|
||||
var prefix string
|
||||
err := t.conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix)
|
||||
if err == sql.ErrNoRows || prefix == "" {
|
||||
// CRITICAL: Reject operation if issue_prefix config is missing (bd-166)
|
||||
return fmt.Errorf("database not initialized: issue_prefix config is missing (run 'bd init --prefix <prefix>' first)")
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to get config: %w", err)
|
||||
}
|
||||
|
||||
// Generate or validate ID
|
||||
if issue.ID == "" {
|
||||
// Generate hash-based ID with adaptive length based on database size (bd-ea2a13)
|
||||
generatedID, err := GenerateIssueID(ctx, t.conn, prefix, issue, actor)
|
||||
if err != nil {
|
||||
return wrapDBError("generate issue ID", err)
|
||||
}
|
||||
issue.ID = generatedID
|
||||
} else {
|
||||
// Validate that explicitly provided ID matches the configured prefix (bd-177)
|
||||
if err := ValidateIssueIDPrefix(issue.ID, prefix); err != nil {
|
||||
return wrapDBError("validate issue ID prefix", err)
|
||||
}
|
||||
|
||||
// For hierarchical IDs (bd-a3f8e9.1), ensure parent exists
|
||||
if strings.Contains(issue.ID, ".") {
|
||||
// Try to resurrect entire parent chain if any parents are missing
|
||||
resurrected, err := t.parent.tryResurrectParentChainWithConn(ctx, t.conn, issue.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to resurrect parent chain for %s: %w", issue.ID, err)
|
||||
}
|
||||
if !resurrected {
|
||||
// Parent(s) not found in JSONL history - cannot proceed
|
||||
lastDot := strings.LastIndex(issue.ID, ".")
|
||||
parentID := issue.ID[:lastDot]
|
||||
return fmt.Errorf("parent issue %s does not exist and could not be resurrected from JSONL history", parentID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Insert issue
|
||||
if err := insertIssue(ctx, t.conn, issue); err != nil {
|
||||
return wrapDBError("insert issue", err)
|
||||
}
|
||||
|
||||
// Record creation event
|
||||
if err := recordCreatedEvent(ctx, t.conn, issue, actor); err != nil {
|
||||
return wrapDBError("record creation event", err)
|
||||
}
|
||||
|
||||
// Mark issue as dirty for incremental export
|
||||
if err := markDirty(ctx, t.conn, issue.ID); err != nil {
|
||||
return wrapDBError("mark issue dirty", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateIssues creates multiple issues within the transaction.
|
||||
func (t *sqliteTxStorage) CreateIssues(ctx context.Context, issues []*types.Issue, actor string) error {
|
||||
if len(issues) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate and prepare all issues first
|
||||
now := time.Now()
|
||||
for _, issue := range issues {
|
||||
if err := issue.Validate(); err != nil {
|
||||
return fmt.Errorf("validation failed for issue: %w", err)
|
||||
}
|
||||
issue.CreatedAt = now
|
||||
issue.UpdatedAt = now
|
||||
if issue.ContentHash == "" {
|
||||
issue.ContentHash = issue.ComputeContentHash()
|
||||
}
|
||||
}
|
||||
|
||||
// Get prefix from config
|
||||
var prefix string
|
||||
err := t.conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&prefix)
|
||||
if err == sql.ErrNoRows || prefix == "" {
|
||||
return fmt.Errorf("database not initialized: issue_prefix config is missing")
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to get config: %w", err)
|
||||
}
|
||||
|
||||
// Generate IDs for issues that don't have them
|
||||
for _, issue := range issues {
|
||||
if issue.ID == "" {
|
||||
generatedID, err := GenerateIssueID(ctx, t.conn, prefix, issue, actor)
|
||||
if err != nil {
|
||||
return wrapDBError("generate issue ID", err)
|
||||
}
|
||||
issue.ID = generatedID
|
||||
} else {
|
||||
if err := ValidateIssueIDPrefix(issue.ID, prefix); err != nil {
|
||||
return wrapDBError("validate issue ID prefix", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for duplicate IDs within the batch
|
||||
seenIDs := make(map[string]bool)
|
||||
for _, issue := range issues {
|
||||
if seenIDs[issue.ID] {
|
||||
return fmt.Errorf("duplicate issue ID within batch: %s", issue.ID)
|
||||
}
|
||||
seenIDs[issue.ID] = true
|
||||
}
|
||||
|
||||
// Insert all issues
|
||||
if err := insertIssues(ctx, t.conn, issues); err != nil {
|
||||
return wrapDBError("insert issues", err)
|
||||
}
|
||||
|
||||
// Record creation events
|
||||
if err := recordCreatedEvents(ctx, t.conn, issues, actor); err != nil {
|
||||
return wrapDBError("record creation events", err)
|
||||
}
|
||||
|
||||
// Mark all issues as dirty
|
||||
if err := markDirtyBatch(ctx, t.conn, issues); err != nil {
|
||||
return wrapDBError("mark issues dirty", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetIssue retrieves an issue within the transaction.
|
||||
// This enables read-your-writes semantics within the transaction.
|
||||
func (t *sqliteTxStorage) GetIssue(ctx context.Context, id string) (*types.Issue, error) {
|
||||
var issue types.Issue
|
||||
var closedAt sql.NullTime
|
||||
var estimatedMinutes sql.NullInt64
|
||||
var assignee sql.NullString
|
||||
var externalRef sql.NullString
|
||||
var compactedAt sql.NullTime
|
||||
var originalSize sql.NullInt64
|
||||
var sourceRepo sql.NullString
|
||||
var contentHash sql.NullString
|
||||
var compactedAtCommit sql.NullString
|
||||
|
||||
err := t.conn.QueryRowContext(ctx, `
|
||||
SELECT id, content_hash, title, description, design, acceptance_criteria, notes,
|
||||
status, priority, issue_type, assignee, estimated_minutes,
|
||||
created_at, updated_at, closed_at, external_ref,
|
||||
compaction_level, compacted_at, compacted_at_commit, original_size, source_repo
|
||||
FROM issues
|
||||
WHERE id = ?
|
||||
`, id).Scan(
|
||||
&issue.ID, &contentHash, &issue.Title, &issue.Description, &issue.Design,
|
||||
&issue.AcceptanceCriteria, &issue.Notes, &issue.Status,
|
||||
&issue.Priority, &issue.IssueType, &assignee, &estimatedMinutes,
|
||||
&issue.CreatedAt, &issue.UpdatedAt, &closedAt, &externalRef,
|
||||
&issue.CompactionLevel, &compactedAt, &compactedAtCommit, &originalSize, &sourceRepo,
|
||||
)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get issue: %w", err)
|
||||
}
|
||||
|
||||
if contentHash.Valid {
|
||||
issue.ContentHash = contentHash.String
|
||||
}
|
||||
if closedAt.Valid {
|
||||
issue.ClosedAt = &closedAt.Time
|
||||
}
|
||||
if estimatedMinutes.Valid {
|
||||
mins := int(estimatedMinutes.Int64)
|
||||
issue.EstimatedMinutes = &mins
|
||||
}
|
||||
if assignee.Valid {
|
||||
issue.Assignee = assignee.String
|
||||
}
|
||||
if externalRef.Valid {
|
||||
issue.ExternalRef = &externalRef.String
|
||||
}
|
||||
if compactedAt.Valid {
|
||||
issue.CompactedAt = &compactedAt.Time
|
||||
}
|
||||
if compactedAtCommit.Valid {
|
||||
issue.CompactedAtCommit = &compactedAtCommit.String
|
||||
}
|
||||
if originalSize.Valid {
|
||||
issue.OriginalSize = int(originalSize.Int64)
|
||||
}
|
||||
if sourceRepo.Valid {
|
||||
issue.SourceRepo = sourceRepo.String
|
||||
}
|
||||
|
||||
// Fetch labels for this issue using the transaction connection
|
||||
labels, err := t.getLabels(ctx, issue.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get labels: %w", err)
|
||||
}
|
||||
issue.Labels = labels
|
||||
|
||||
return &issue, nil
|
||||
}
|
||||
|
||||
// getLabels retrieves labels using the transaction's connection
|
||||
func (t *sqliteTxStorage) getLabels(ctx context.Context, issueID string) ([]string, error) {
|
||||
rows, err := t.conn.QueryContext(ctx, `
|
||||
SELECT label FROM labels WHERE issue_id = ? ORDER BY label
|
||||
`, issueID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get labels: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
var labels []string
|
||||
for rows.Next() {
|
||||
var label string
|
||||
if err := rows.Scan(&label); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
labels = append(labels, label)
|
||||
}
|
||||
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
// UpdateIssue updates an issue within the transaction.
|
||||
func (t *sqliteTxStorage) UpdateIssue(ctx context.Context, id string, updates map[string]interface{}, actor string) error {
|
||||
// Get old issue for event
|
||||
oldIssue, err := t.GetIssue(ctx, id)
|
||||
if err != nil {
|
||||
return wrapDBError("get issue for update", err)
|
||||
}
|
||||
if oldIssue == nil {
|
||||
return fmt.Errorf("issue %s not found", id)
|
||||
}
|
||||
|
||||
// Build update query with validated field names
|
||||
setClauses := []string{"updated_at = ?"}
|
||||
args := []interface{}{time.Now()}
|
||||
|
||||
for key, value := range updates {
|
||||
// Prevent SQL injection by validating field names
|
||||
if !allowedUpdateFields[key] {
|
||||
return fmt.Errorf("invalid field for update: %s", key)
|
||||
}
|
||||
|
||||
// Validate field values
|
||||
if err := validateFieldUpdate(key, value); err != nil {
|
||||
return wrapDBError("validate field update", err)
|
||||
}
|
||||
|
||||
setClauses = append(setClauses, fmt.Sprintf("%s = ?", key))
|
||||
args = append(args, value)
|
||||
}
|
||||
|
||||
// Auto-manage closed_at when status changes
|
||||
setClauses, args = manageClosedAt(oldIssue, updates, setClauses, args)
|
||||
|
||||
// Recompute content_hash if any content fields changed (bd-95)
|
||||
contentChanged := false
|
||||
contentFields := []string{"title", "description", "design", "acceptance_criteria", "notes", "status", "priority", "issue_type", "assignee", "external_ref"}
|
||||
for _, field := range contentFields {
|
||||
if _, exists := updates[field]; exists {
|
||||
contentChanged = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if contentChanged {
|
||||
updatedIssue := *oldIssue
|
||||
applyUpdatesToIssue(&updatedIssue, updates)
|
||||
newHash := updatedIssue.ComputeContentHash()
|
||||
setClauses = append(setClauses, "content_hash = ?")
|
||||
args = append(args, newHash)
|
||||
}
|
||||
|
||||
args = append(args, id)
|
||||
|
||||
// Update issue
|
||||
query := fmt.Sprintf("UPDATE issues SET %s WHERE id = ?", strings.Join(setClauses, ", ")) // #nosec G201 - safe SQL with controlled column names
|
||||
_, err = t.conn.ExecContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update issue: %w", err)
|
||||
}
|
||||
|
||||
// Record event
|
||||
oldData, err := json.Marshal(oldIssue)
|
||||
if err != nil {
|
||||
oldData = []byte(fmt.Sprintf(`{"id":"%s"}`, id))
|
||||
}
|
||||
newData, err := json.Marshal(updates)
|
||||
if err != nil {
|
||||
newData = []byte(`{}`)
|
||||
}
|
||||
|
||||
eventType := determineEventType(oldIssue, updates)
|
||||
|
||||
_, err = t.conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, old_value, new_value)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
`, id, eventType, actor, string(oldData), string(newData))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
|
||||
// Mark issue as dirty
|
||||
if err := markDirty(ctx, t.conn, id); err != nil {
|
||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyUpdatesToIssue applies update map to issue for content hash recomputation
|
||||
func applyUpdatesToIssue(issue *types.Issue, updates map[string]interface{}) {
|
||||
for key, value := range updates {
|
||||
switch key {
|
||||
case "title":
|
||||
issue.Title = value.(string)
|
||||
case "description":
|
||||
issue.Description = value.(string)
|
||||
case "design":
|
||||
issue.Design = value.(string)
|
||||
case "acceptance_criteria":
|
||||
issue.AcceptanceCriteria = value.(string)
|
||||
case "notes":
|
||||
issue.Notes = value.(string)
|
||||
case "status":
|
||||
if s, ok := value.(types.Status); ok {
|
||||
issue.Status = s
|
||||
} else {
|
||||
issue.Status = types.Status(value.(string))
|
||||
}
|
||||
case "priority":
|
||||
issue.Priority = value.(int)
|
||||
case "issue_type":
|
||||
if t, ok := value.(types.IssueType); ok {
|
||||
issue.IssueType = t
|
||||
} else {
|
||||
issue.IssueType = types.IssueType(value.(string))
|
||||
}
|
||||
case "assignee":
|
||||
if value == nil {
|
||||
issue.Assignee = ""
|
||||
} else {
|
||||
issue.Assignee = value.(string)
|
||||
}
|
||||
case "external_ref":
|
||||
if value == nil {
|
||||
issue.ExternalRef = nil
|
||||
} else {
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
issue.ExternalRef = &v
|
||||
case *string:
|
||||
issue.ExternalRef = v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CloseIssue closes an issue within the transaction.
|
||||
func (t *sqliteTxStorage) CloseIssue(ctx context.Context, id string, reason string, actor string) error {
|
||||
now := time.Now()
|
||||
|
||||
result, err := t.conn.ExecContext(ctx, `
|
||||
UPDATE issues SET status = ?, closed_at = ?, updated_at = ?
|
||||
WHERE id = ?
|
||||
`, types.StatusClosed, now, now, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to close issue: %w", err)
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get rows affected: %w", err)
|
||||
}
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("issue not found: %s", id)
|
||||
}
|
||||
|
||||
_, err = t.conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, id, types.EventClosed, actor, reason)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
|
||||
// Mark issue as dirty
|
||||
if err := markDirty(ctx, t.conn, id); err != nil {
|
||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteIssue deletes an issue within the transaction.
|
||||
func (t *sqliteTxStorage) DeleteIssue(ctx context.Context, id string) error {
|
||||
// Delete dependencies (both directions)
|
||||
_, err := t.conn.ExecContext(ctx, `DELETE FROM dependencies WHERE issue_id = ? OR depends_on_id = ?`, id, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete dependencies: %w", err)
|
||||
}
|
||||
|
||||
// Delete events
|
||||
_, err = t.conn.ExecContext(ctx, `DELETE FROM events WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete events: %w", err)
|
||||
}
|
||||
|
||||
// Delete from dirty_issues
|
||||
_, err = t.conn.ExecContext(ctx, `DELETE FROM dirty_issues WHERE issue_id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete dirty marker: %w", err)
|
||||
}
|
||||
|
||||
// Delete the issue itself
|
||||
result, err := t.conn.ExecContext(ctx, `DELETE FROM issues WHERE id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete issue: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check rows affected: %w", err)
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return fmt.Errorf("issue not found: %s", id)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddDependency adds a dependency between issues within the transaction.
|
||||
func (t *sqliteTxStorage) AddDependency(ctx context.Context, dep *types.Dependency, actor string) error {
|
||||
// Validate dependency type
|
||||
if !dep.Type.IsValid() {
|
||||
return fmt.Errorf("invalid dependency type: %s (must be blocks, related, parent-child, or discovered-from)", dep.Type)
|
||||
}
|
||||
|
||||
// Validate that both issues exist
|
||||
issueExists, err := t.GetIssue(ctx, dep.IssueID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check issue %s: %w", dep.IssueID, err)
|
||||
}
|
||||
if issueExists == nil {
|
||||
return fmt.Errorf("issue %s not found", dep.IssueID)
|
||||
}
|
||||
|
||||
dependsOnExists, err := t.GetIssue(ctx, dep.DependsOnID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check dependency %s: %w", dep.DependsOnID, err)
|
||||
}
|
||||
if dependsOnExists == nil {
|
||||
return fmt.Errorf("dependency target %s not found", dep.DependsOnID)
|
||||
}
|
||||
|
||||
// Prevent self-dependency
|
||||
if dep.IssueID == dep.DependsOnID {
|
||||
return fmt.Errorf("issue cannot depend on itself")
|
||||
}
|
||||
|
||||
// Validate parent-child dependency direction
|
||||
if dep.Type == types.DepParentChild {
|
||||
if issueExists.IssueType == types.TypeEpic && dependsOnExists.IssueType != types.TypeEpic {
|
||||
return fmt.Errorf("invalid parent-child dependency: parent (%s) cannot depend on child (%s). Use: bd dep add %s %s --type parent-child",
|
||||
dep.IssueID, dep.DependsOnID, dep.DependsOnID, dep.IssueID)
|
||||
}
|
||||
}
|
||||
|
||||
if dep.CreatedAt.IsZero() {
|
||||
dep.CreatedAt = time.Now()
|
||||
}
|
||||
if dep.CreatedBy == "" {
|
||||
dep.CreatedBy = actor
|
||||
}
|
||||
|
||||
// Cycle detection
|
||||
var cycleExists bool
|
||||
err = t.conn.QueryRowContext(ctx, `
|
||||
WITH RECURSIVE paths AS (
|
||||
SELECT
|
||||
issue_id,
|
||||
depends_on_id,
|
||||
1 as depth
|
||||
FROM dependencies
|
||||
WHERE issue_id = ?
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT
|
||||
d.issue_id,
|
||||
d.depends_on_id,
|
||||
p.depth + 1
|
||||
FROM dependencies d
|
||||
JOIN paths p ON d.issue_id = p.depends_on_id
|
||||
WHERE p.depth < 100
|
||||
)
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM paths
|
||||
WHERE depends_on_id = ?
|
||||
)
|
||||
`, dep.DependsOnID, dep.IssueID).Scan(&cycleExists)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check for cycles: %w", err)
|
||||
}
|
||||
|
||||
if cycleExists {
|
||||
return fmt.Errorf("cannot add dependency: would create a cycle (%s → %s → ... → %s)",
|
||||
dep.IssueID, dep.DependsOnID, dep.IssueID)
|
||||
}
|
||||
|
||||
// Insert dependency
|
||||
_, err = t.conn.ExecContext(ctx, `
|
||||
INSERT INTO dependencies (issue_id, depends_on_id, type, created_at, created_by)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
`, dep.IssueID, dep.DependsOnID, dep.Type, dep.CreatedAt, dep.CreatedBy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add dependency: %w", err)
|
||||
}
|
||||
|
||||
// Record event
|
||||
_, err = t.conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, dep.IssueID, types.EventDependencyAdded, actor,
|
||||
fmt.Sprintf("Added dependency: %s %s %s", dep.IssueID, dep.Type, dep.DependsOnID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
|
||||
// Mark both issues as dirty
|
||||
if err := markDirty(ctx, t.conn, dep.IssueID); err != nil {
|
||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
if err := markDirty(ctx, t.conn, dep.DependsOnID); err != nil {
|
||||
return fmt.Errorf("failed to mark depends-on issue dirty: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveDependency removes a dependency within the transaction.
|
||||
func (t *sqliteTxStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
|
||||
result, err := t.conn.ExecContext(ctx, `
|
||||
DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
|
||||
`, issueID, dependsOnID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove dependency: %w", err)
|
||||
}
|
||||
|
||||
// Check if dependency existed
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check rows affected: %w", err)
|
||||
}
|
||||
if rowsAffected == 0 {
|
||||
return fmt.Errorf("dependency from %s to %s does not exist", issueID, dependsOnID)
|
||||
}
|
||||
|
||||
_, err = t.conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, types.EventDependencyRemoved, actor,
|
||||
fmt.Sprintf("Removed dependency on %s", dependsOnID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
|
||||
// Mark both issues as dirty
|
||||
if err := markDirty(ctx, t.conn, issueID); err != nil {
|
||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
if err := markDirty(ctx, t.conn, dependsOnID); err != nil {
|
||||
return fmt.Errorf("failed to mark depends-on issue dirty: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddLabel adds a label to an issue within the transaction.
|
||||
func (t *sqliteTxStorage) AddLabel(ctx context.Context, issueID, label, actor string) error {
|
||||
result, err := t.conn.ExecContext(ctx, `
|
||||
INSERT OR IGNORE INTO labels (issue_id, label) VALUES (?, ?)
|
||||
`, issueID, label)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add label: %w", err)
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check rows affected: %w", err)
|
||||
}
|
||||
if rows == 0 {
|
||||
// Label already existed, no change made
|
||||
return nil
|
||||
}
|
||||
|
||||
// Record event
|
||||
_, err = t.conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, types.EventLabelAdded, actor, fmt.Sprintf("Added label: %s", label))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
|
||||
// Mark issue as dirty
|
||||
if err := markDirty(ctx, t.conn, issueID); err != nil {
|
||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveLabel removes a label from an issue within the transaction.
|
||||
func (t *sqliteTxStorage) RemoveLabel(ctx context.Context, issueID, label, actor string) error {
|
||||
result, err := t.conn.ExecContext(ctx, `
|
||||
DELETE FROM labels WHERE issue_id = ? AND label = ?
|
||||
`, issueID, label)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove label: %w", err)
|
||||
}
|
||||
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check rows affected: %w", err)
|
||||
}
|
||||
if rows == 0 {
|
||||
// Label didn't exist, no change made
|
||||
return nil
|
||||
}
|
||||
|
||||
// Record event
|
||||
_, err = t.conn.ExecContext(ctx, `
|
||||
INSERT INTO events (issue_id, event_type, actor, comment)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, types.EventLabelRemoved, actor, fmt.Sprintf("Removed label: %s", label))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record event: %w", err)
|
||||
}
|
||||
|
||||
// Mark issue as dirty
|
||||
if err := markDirty(ctx, t.conn, issueID); err != nil {
|
||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
857
internal/storage/sqlite/transaction_test.go
Normal file
857
internal/storage/sqlite/transaction_test.go
Normal file
@@ -0,0 +1,857 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// TestRunInTransactionBasic verifies the RunInTransaction method exists and
|
||||
// can be called.
|
||||
func TestRunInTransactionBasic(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Test that we can call RunInTransaction
|
||||
callCount := 0
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
callCount++
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("RunInTransaction returned error: %v", err)
|
||||
}
|
||||
|
||||
if callCount != 1 {
|
||||
t.Errorf("expected callback to be called once, got %d", callCount)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunInTransactionRollbackOnError verifies that returning an error
|
||||
// from the callback does not cause a panic and the error is propagated.
|
||||
func TestRunInTransactionRollbackOnError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
expectedErr := "intentional test error"
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
return &testError{msg: expectedErr}
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
t.Error("expected error to be returned, got nil")
|
||||
}
|
||||
|
||||
if err.Error() != expectedErr {
|
||||
t.Errorf("expected error %q, got %q", expectedErr, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunInTransactionPanicRecovery verifies that panics in the callback
|
||||
// are recovered and re-raised after rollback.
|
||||
func TestRunInTransactionPanicRecovery(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Error("expected panic to be re-raised, but no panic occurred")
|
||||
} else if r != "test panic" {
|
||||
t.Errorf("unexpected panic value: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
_ = store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
panic("test panic")
|
||||
})
|
||||
|
||||
t.Error("should not reach here - panic should have been re-raised")
|
||||
}
|
||||
|
||||
// TestTransactionCreateIssue tests creating an issue within a transaction.
|
||||
func TestTransactionCreateIssue(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
var createdID string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
issue := &types.Issue{
|
||||
Title: "Test Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
createdID = issue.ID
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
if createdID == "" {
|
||||
t.Error("expected issue ID to be set after creation")
|
||||
}
|
||||
|
||||
// Verify issue exists after commit
|
||||
issue, err := store.GetIssue(ctx, createdID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed: %v", err)
|
||||
}
|
||||
if issue == nil {
|
||||
t.Error("expected issue to exist after transaction commit")
|
||||
}
|
||||
if issue.Title != "Test Issue" {
|
||||
t.Errorf("expected title 'Test Issue', got %q", issue.Title)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionRollbackOnCreateError tests that issues are not created
|
||||
// when transaction rolls back due to error.
|
||||
func TestTransactionRollbackOnCreateError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
var createdID string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
issue := &types.Issue{
|
||||
Title: "Test Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
createdID = issue.ID
|
||||
|
||||
// Return error to trigger rollback
|
||||
return &testError{msg: "intentional rollback"}
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
t.Error("expected error from transaction")
|
||||
}
|
||||
|
||||
// Verify issue does NOT exist after rollback
|
||||
if createdID != "" {
|
||||
issue, err := store.GetIssue(ctx, createdID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed: %v", err)
|
||||
}
|
||||
if issue != nil {
|
||||
t.Error("expected issue to NOT exist after transaction rollback")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionMultipleIssues tests creating multiple issues atomically.
|
||||
func TestTransactionMultipleIssues(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
var ids []string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
for i := 0; i < 3; i++ {
|
||||
issue := &types.Issue{
|
||||
Title: "Test Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
ids = append(ids, issue.ID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify all issues exist
|
||||
for _, id := range ids {
|
||||
issue, err := store.GetIssue(ctx, id)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed for %s: %v", id, err)
|
||||
}
|
||||
if issue == nil {
|
||||
t.Errorf("expected issue %s to exist", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionUpdateIssue tests updating an issue within a transaction.
|
||||
func TestTransactionUpdateIssue(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create issue first
|
||||
issue := &types.Issue{
|
||||
Title: "Original Title",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := store.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
|
||||
// Update in transaction
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
return tx.UpdateIssue(ctx, issue.ID, map[string]interface{}{
|
||||
"title": "Updated Title",
|
||||
}, "test-actor")
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify update
|
||||
updated, err := store.GetIssue(ctx, issue.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed: %v", err)
|
||||
}
|
||||
if updated.Title != "Updated Title" {
|
||||
t.Errorf("expected title 'Updated Title', got %q", updated.Title)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionCloseIssue tests closing an issue within a transaction.
|
||||
func TestTransactionCloseIssue(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create issue first
|
||||
issue := &types.Issue{
|
||||
Title: "Test Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := store.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
|
||||
// Close in transaction
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
return tx.CloseIssue(ctx, issue.ID, "Done", "test-actor")
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify closed
|
||||
closed, err := store.GetIssue(ctx, issue.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed: %v", err)
|
||||
}
|
||||
if closed.Status != types.StatusClosed {
|
||||
t.Errorf("expected status 'closed', got %q", closed.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionDeleteIssue tests deleting an issue within a transaction.
|
||||
func TestTransactionDeleteIssue(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create issue first
|
||||
issue := &types.Issue{
|
||||
Title: "Test Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := store.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
|
||||
// Delete in transaction
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
return tx.DeleteIssue(ctx, issue.ID)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify deleted
|
||||
deleted, err := store.GetIssue(ctx, issue.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed: %v", err)
|
||||
}
|
||||
if deleted != nil {
|
||||
t.Error("expected issue to be deleted")
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionGetIssue tests read-your-writes within a transaction.
|
||||
func TestTransactionGetIssue(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// Create issue
|
||||
issue := &types.Issue{
|
||||
Title: "Test Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read it back within same transaction (read-your-writes)
|
||||
retrieved, err := tx.GetIssue(ctx, issue.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if retrieved == nil {
|
||||
t.Error("expected to read issue within transaction")
|
||||
}
|
||||
if retrieved.Title != "Test Issue" {
|
||||
t.Errorf("expected title 'Test Issue', got %q", retrieved.Title)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionCreateIssues tests batch issue creation within a transaction.
|
||||
func TestTransactionCreateIssues(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
var ids []string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
issues := []*types.Issue{
|
||||
{Title: "Issue 1", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask},
|
||||
{Title: "Issue 2", Status: types.StatusOpen, Priority: 2, IssueType: types.TypeTask},
|
||||
{Title: "Issue 3", Status: types.StatusOpen, Priority: 3, IssueType: types.TypeTask},
|
||||
}
|
||||
if err := tx.CreateIssues(ctx, issues, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, issue := range issues {
|
||||
ids = append(ids, issue.ID)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify all issues exist
|
||||
for i, id := range ids {
|
||||
issue, err := store.GetIssue(ctx, id)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed for %s: %v", id, err)
|
||||
}
|
||||
if issue == nil {
|
||||
t.Errorf("expected issue %s to exist", id)
|
||||
}
|
||||
expectedTitle := "Issue " + string(rune('1'+i))
|
||||
if issue.Title != expectedTitle {
|
||||
t.Errorf("expected title %q, got %q", expectedTitle, issue.Title)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionAddDependency tests adding a dependency within a transaction.
|
||||
func TestTransactionAddDependency(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create two issues first
|
||||
issue1 := &types.Issue{Title: "Issue 1", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
|
||||
issue2 := &types.Issue{Title: "Issue 2", Status: types.StatusOpen, Priority: 2, IssueType: types.TypeTask}
|
||||
if err := store.CreateIssue(ctx, issue1, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
if err := store.CreateIssue(ctx, issue2, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
|
||||
// Add dependency in transaction
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
dep := &types.Dependency{
|
||||
IssueID: issue1.ID,
|
||||
DependsOnID: issue2.ID,
|
||||
Type: types.DepBlocks,
|
||||
}
|
||||
return tx.AddDependency(ctx, dep, "test-actor")
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify dependency exists
|
||||
deps, err := store.GetDependencies(ctx, issue1.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetDependencies failed: %v", err)
|
||||
}
|
||||
if len(deps) != 1 {
|
||||
t.Errorf("expected 1 dependency, got %d", len(deps))
|
||||
}
|
||||
if deps[0].ID != issue2.ID {
|
||||
t.Errorf("expected dependency on %s, got %s", issue2.ID, deps[0].ID)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionRemoveDependency tests removing a dependency within a transaction.
|
||||
func TestTransactionRemoveDependency(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create two issues and add dependency
|
||||
issue1 := &types.Issue{Title: "Issue 1", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
|
||||
issue2 := &types.Issue{Title: "Issue 2", Status: types.StatusOpen, Priority: 2, IssueType: types.TypeTask}
|
||||
if err := store.CreateIssue(ctx, issue1, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
if err := store.CreateIssue(ctx, issue2, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
dep := &types.Dependency{IssueID: issue1.ID, DependsOnID: issue2.ID, Type: types.DepBlocks}
|
||||
if err := store.AddDependency(ctx, dep, "test-actor"); err != nil {
|
||||
t.Fatalf("AddDependency failed: %v", err)
|
||||
}
|
||||
|
||||
// Remove dependency in transaction
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
return tx.RemoveDependency(ctx, issue1.ID, issue2.ID, "test-actor")
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify dependency is gone
|
||||
deps, err := store.GetDependencies(ctx, issue1.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetDependencies failed: %v", err)
|
||||
}
|
||||
if len(deps) != 0 {
|
||||
t.Errorf("expected 0 dependencies, got %d", len(deps))
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionAddLabel tests adding a label within a transaction.
|
||||
func TestTransactionAddLabel(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create issue first
|
||||
issue := &types.Issue{Title: "Test Issue", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
|
||||
if err := store.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
|
||||
// Add label in transaction
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
return tx.AddLabel(ctx, issue.ID, "test-label", "test-actor")
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify label exists
|
||||
labels, err := store.GetLabels(ctx, issue.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetLabels failed: %v", err)
|
||||
}
|
||||
if len(labels) != 1 {
|
||||
t.Errorf("expected 1 label, got %d", len(labels))
|
||||
}
|
||||
if labels[0] != "test-label" {
|
||||
t.Errorf("expected label 'test-label', got %s", labels[0])
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionRemoveLabel tests removing a label within a transaction.
|
||||
func TestTransactionRemoveLabel(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create issue and add label
|
||||
issue := &types.Issue{Title: "Test Issue", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
|
||||
if err := store.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
if err := store.AddLabel(ctx, issue.ID, "test-label", "test-actor"); err != nil {
|
||||
t.Fatalf("AddLabel failed: %v", err)
|
||||
}
|
||||
|
||||
// Remove label in transaction
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
return tx.RemoveLabel(ctx, issue.ID, "test-label", "test-actor")
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify label is gone
|
||||
labels, err := store.GetLabels(ctx, issue.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetLabels failed: %v", err)
|
||||
}
|
||||
if len(labels) != 0 {
|
||||
t.Errorf("expected 0 labels, got %d", len(labels))
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionAtomicIssueWithDependency tests creating issue + adding dependency atomically.
|
||||
func TestTransactionAtomicIssueWithDependency(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
// Create parent issue first
|
||||
parent := &types.Issue{Title: "Parent", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
|
||||
if err := store.CreateIssue(ctx, parent, "test-actor"); err != nil {
|
||||
t.Fatalf("CreateIssue failed: %v", err)
|
||||
}
|
||||
|
||||
var childID string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// Create child issue
|
||||
child := &types.Issue{Title: "Child", Status: types.StatusOpen, Priority: 2, IssueType: types.TypeTask}
|
||||
if err := tx.CreateIssue(ctx, child, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
childID = child.ID
|
||||
|
||||
// Add dependency: child blocks parent (child must be done before parent)
|
||||
dep := &types.Dependency{
|
||||
IssueID: parent.ID,
|
||||
DependsOnID: child.ID,
|
||||
Type: types.DepBlocks,
|
||||
}
|
||||
return tx.AddDependency(ctx, dep, "test-actor")
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify both issue and dependency exist
|
||||
child, err := store.GetIssue(ctx, childID)
|
||||
if err != nil || child == nil {
|
||||
t.Error("expected child issue to exist")
|
||||
}
|
||||
|
||||
deps, err := store.GetDependencies(ctx, parent.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetDependencies failed: %v", err)
|
||||
}
|
||||
if len(deps) != 1 || deps[0].ID != childID {
|
||||
t.Error("expected dependency from parent to child")
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionAtomicIssueWithLabels tests creating issue + adding labels atomically.
|
||||
func TestTransactionAtomicIssueWithLabels(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
var issueID string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// Create issue
|
||||
issue := &types.Issue{Title: "Test Issue", Status: types.StatusOpen, Priority: 1, IssueType: types.TypeTask}
|
||||
if err := tx.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
issueID = issue.ID
|
||||
|
||||
// Add multiple labels
|
||||
for _, label := range []string{"label1", "label2", "label3"} {
|
||||
if err := tx.AddLabel(ctx, issue.ID, label, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify issue and all labels exist
|
||||
issue, err := store.GetIssue(ctx, issueID)
|
||||
if err != nil || issue == nil {
|
||||
t.Error("expected issue to exist")
|
||||
}
|
||||
|
||||
labels, err := store.GetLabels(ctx, issueID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetLabels failed: %v", err)
|
||||
}
|
||||
if len(labels) != 3 {
|
||||
t.Errorf("expected 3 labels, got %d", len(labels))
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionEmpty tests that an empty transaction commits successfully.
|
||||
func TestTransactionEmpty(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// Do nothing - empty transaction
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("empty transaction should succeed, got error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionConcurrent tests multiple concurrent transactions.
|
||||
func TestTransactionConcurrent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
const numGoroutines = 10
|
||||
errors := make(chan error, numGoroutines)
|
||||
ids := make(chan string, numGoroutines)
|
||||
|
||||
// Launch concurrent transactions
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
go func(index int) {
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
issue := &types.Issue{
|
||||
Title: "Concurrent Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: index % 4,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, issue, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
ids <- issue.ID
|
||||
return nil
|
||||
})
|
||||
errors <- err
|
||||
}(i)
|
||||
}
|
||||
|
||||
// Collect results
|
||||
var errs []error
|
||||
var createdIDs []string
|
||||
for i := 0; i < numGoroutines; i++ {
|
||||
if err := <-errors; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
close(ids)
|
||||
for id := range ids {
|
||||
createdIDs = append(createdIDs, id)
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
t.Errorf("some transactions failed: %v", errs)
|
||||
}
|
||||
|
||||
if len(createdIDs) != numGoroutines {
|
||||
t.Errorf("expected %d issues created, got %d", numGoroutines, len(createdIDs))
|
||||
}
|
||||
|
||||
// Verify all issues exist
|
||||
for _, id := range createdIDs {
|
||||
issue, err := store.GetIssue(ctx, id)
|
||||
if err != nil || issue == nil {
|
||||
t.Errorf("expected issue %s to exist", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionNestedFailure tests that when first op succeeds but second fails,
|
||||
// both are rolled back.
|
||||
func TestTransactionNestedFailure(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
var firstIssueID string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// First operation succeeds
|
||||
issue1 := &types.Issue{
|
||||
Title: "First Issue",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 1,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, issue1, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
firstIssueID = issue1.ID
|
||||
|
||||
// Second operation fails
|
||||
issue2 := &types.Issue{
|
||||
Title: "", // Invalid - missing title
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
}
|
||||
return tx.CreateIssue(ctx, issue2, "test-actor")
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
t.Error("expected error from invalid second issue")
|
||||
}
|
||||
|
||||
// Verify first issue was NOT created (rolled back)
|
||||
if firstIssueID != "" {
|
||||
issue, err := store.GetIssue(ctx, firstIssueID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetIssue failed: %v", err)
|
||||
}
|
||||
if issue != nil {
|
||||
t.Error("expected first issue to be rolled back, but it exists")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransactionAtomicPlanApproval simulates a VC plan approval workflow:
|
||||
// creating multiple issues with dependencies and labels atomically.
|
||||
func TestTransactionAtomicPlanApproval(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, cleanup := setupTestDB(t)
|
||||
defer cleanup()
|
||||
|
||||
var epicID, task1ID, task2ID string
|
||||
err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// Create epic
|
||||
epic := &types.Issue{
|
||||
Title: "Epic: Feature Implementation",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 1,
|
||||
IssueType: types.TypeEpic,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, epic, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
epicID = epic.ID
|
||||
|
||||
// Create task 1
|
||||
task1 := &types.Issue{
|
||||
Title: "Task 1: Setup",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, task1, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
task1ID = task1.ID
|
||||
|
||||
// Create task 2
|
||||
task2 := &types.Issue{
|
||||
Title: "Task 2: Implementation",
|
||||
Status: types.StatusOpen,
|
||||
Priority: 2,
|
||||
IssueType: types.TypeTask,
|
||||
}
|
||||
if err := tx.CreateIssue(ctx, task2, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
task2ID = task2.ID
|
||||
|
||||
// Add dependencies: task2 depends on task1
|
||||
dep := &types.Dependency{
|
||||
IssueID: task2ID,
|
||||
DependsOnID: task1ID,
|
||||
Type: types.DepBlocks,
|
||||
}
|
||||
if err := tx.AddDependency(ctx, dep, "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add labels to all issues
|
||||
for _, id := range []string{epicID, task1ID, task2ID} {
|
||||
if err := tx.AddLabel(ctx, id, "feature-x", "test-actor"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("RunInTransaction failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify all issues exist
|
||||
for _, id := range []string{epicID, task1ID, task2ID} {
|
||||
issue, err := store.GetIssue(ctx, id)
|
||||
if err != nil || issue == nil {
|
||||
t.Errorf("expected issue %s to exist", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify dependency
|
||||
deps, err := store.GetDependencies(ctx, task2ID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetDependencies failed: %v", err)
|
||||
}
|
||||
if len(deps) != 1 || deps[0].ID != task1ID {
|
||||
t.Error("expected task2 to depend on task1")
|
||||
}
|
||||
|
||||
// Verify labels
|
||||
for _, id := range []string{epicID, task1ID, task2ID} {
|
||||
labels, err := store.GetLabels(ctx, id)
|
||||
if err != nil {
|
||||
t.Fatalf("GetLabels failed: %v", err)
|
||||
}
|
||||
if len(labels) != 1 || labels[0] != "feature-x" {
|
||||
t.Errorf("expected 'feature-x' label on %s", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// testError is a simple error type for testing
|
||||
type testError struct {
|
||||
msg string
|
||||
}
|
||||
|
||||
func (e *testError) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
@@ -8,6 +8,61 @@ import (
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// Transaction provides atomic multi-operation support within a single database transaction.
|
||||
//
|
||||
// The Transaction interface exposes a subset of Storage methods that execute within
|
||||
// a single database transaction. This enables atomic workflows where multiple operations
|
||||
// must either all succeed or all fail (e.g., creating issues with dependencies and labels).
|
||||
//
|
||||
// # Transaction Semantics
|
||||
//
|
||||
// - All operations within the transaction share the same database connection
|
||||
// - Changes are not visible to other connections until commit
|
||||
// - If any operation returns an error, the transaction is rolled back
|
||||
// - If the callback function panics, the transaction is rolled back
|
||||
// - On successful return from the callback, the transaction is committed
|
||||
//
|
||||
// # SQLite Specifics
|
||||
//
|
||||
// - Uses BEGIN IMMEDIATE mode to acquire write lock early
|
||||
// - This prevents deadlocks when multiple operations compete for the same lock
|
||||
// - IMMEDIATE mode serializes concurrent transactions properly
|
||||
//
|
||||
// # Example Usage
|
||||
//
|
||||
// err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// // Create parent issue
|
||||
// if err := tx.CreateIssue(ctx, parentIssue, actor); err != nil {
|
||||
// return err // Triggers rollback
|
||||
// }
|
||||
// // Create child issue
|
||||
// if err := tx.CreateIssue(ctx, childIssue, actor); err != nil {
|
||||
// return err // Triggers rollback
|
||||
// }
|
||||
// // Add dependency between them
|
||||
// if err := tx.AddDependency(ctx, dep, actor); err != nil {
|
||||
// return err // Triggers rollback
|
||||
// }
|
||||
// return nil // Triggers commit
|
||||
// })
|
||||
type Transaction interface {
|
||||
// Issue operations
|
||||
CreateIssue(ctx context.Context, issue *types.Issue, actor string) error
|
||||
CreateIssues(ctx context.Context, issues []*types.Issue, actor string) error
|
||||
UpdateIssue(ctx context.Context, id string, updates map[string]interface{}, actor string) error
|
||||
CloseIssue(ctx context.Context, id string, reason string, actor string) error
|
||||
DeleteIssue(ctx context.Context, id string) error
|
||||
GetIssue(ctx context.Context, id string) (*types.Issue, error) // For read-your-writes within transaction
|
||||
|
||||
// Dependency operations
|
||||
AddDependency(ctx context.Context, dep *types.Dependency, actor string) error
|
||||
RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error
|
||||
|
||||
// Label operations
|
||||
AddLabel(ctx context.Context, issueID, label, actor string) error
|
||||
RemoveLabel(ctx context.Context, issueID, label, actor string) error
|
||||
}
|
||||
|
||||
// Storage defines the interface for issue storage backends
|
||||
type Storage interface {
|
||||
// Issues
|
||||
@@ -89,6 +144,26 @@ type Storage interface {
|
||||
RenameDependencyPrefix(ctx context.Context, oldPrefix, newPrefix string) error
|
||||
RenameCounterPrefix(ctx context.Context, oldPrefix, newPrefix string) error
|
||||
|
||||
// Transactions
|
||||
//
|
||||
// RunInTransaction executes a function within a database transaction.
|
||||
// The Transaction interface provides atomic multi-operation support.
|
||||
//
|
||||
// Transaction behavior:
|
||||
// - If fn returns nil, the transaction is committed
|
||||
// - If fn returns an error, the transaction is rolled back
|
||||
// - If fn panics, the transaction is rolled back and the panic is re-raised
|
||||
// - Uses BEGIN IMMEDIATE for SQLite to acquire write lock early
|
||||
//
|
||||
// Example:
|
||||
// err := store.RunInTransaction(ctx, func(tx storage.Transaction) error {
|
||||
// if err := tx.CreateIssue(ctx, issue, actor); err != nil {
|
||||
// return err // Triggers rollback
|
||||
// }
|
||||
// return nil // Triggers commit
|
||||
// })
|
||||
RunInTransaction(ctx context.Context, fn func(tx Transaction) error) error
|
||||
|
||||
// Lifecycle
|
||||
Close() error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user