Implement snapshot creation and restoration for compaction (bd-256)
- Add compaction_snapshots table to schema with proper indexes - Implement CreateSnapshot, RestoreFromSnapshot, GetSnapshots functions - Use UTC timestamps throughout - RestoreFromSnapshot uses transactions with optimistic concurrency control - Add validation for levels and issue_id matching - Prevent race conditions with compaction_level guard - Create bd-268 to explore lightweight SQL alternatives Amp-Thread-ID: https://ampcode.com/threads/T-3bdd0d6b-9212-4e4e-b22d-f658949df7a9 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
@@ -3,8 +3,11 @@ package sqlite
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// CompactionCandidate represents an issue eligible for compaction
|
||||
@@ -16,6 +19,18 @@ type CompactionCandidate struct {
|
||||
DependentCount int
|
||||
}
|
||||
|
||||
// Snapshot represents a saved version of issue content before compaction
|
||||
type Snapshot struct {
|
||||
IssueID string `json:"issue_id"`
|
||||
CompactionLevel int `json:"compaction_level"`
|
||||
Description string `json:"description"`
|
||||
Design string `json:"design"`
|
||||
Notes string `json:"notes"`
|
||||
AcceptanceCriteria string `json:"acceptance_criteria"`
|
||||
Title string `json:"title"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// GetTier1Candidates returns issues eligible for Tier 1 compaction.
|
||||
// Criteria:
|
||||
// - Status = closed
|
||||
@@ -275,3 +290,163 @@ func (s *SQLiteStorage) CheckEligibility(ctx context.Context, issueID string, ti
|
||||
|
||||
return false, fmt.Sprintf("invalid tier: %d", tier), nil
|
||||
}
|
||||
|
||||
// CreateSnapshot creates a snapshot of the issue's content before compaction.
|
||||
// The snapshot includes all text fields and is stored as JSON.
|
||||
// Multiple snapshots can exist per issue (one per compaction level).
|
||||
// NOTE: This should be called within the same transaction as the compaction operation.
|
||||
func (s *SQLiteStorage) CreateSnapshot(ctx context.Context, issue *types.Issue, level int) error {
|
||||
if level <= 0 {
|
||||
return fmt.Errorf("invalid compaction level %d; must be >= 1", level)
|
||||
}
|
||||
|
||||
snapshot := Snapshot{
|
||||
IssueID: issue.ID,
|
||||
CompactionLevel: level,
|
||||
Description: issue.Description,
|
||||
Design: issue.Design,
|
||||
Notes: issue.Notes,
|
||||
AcceptanceCriteria: issue.AcceptanceCriteria,
|
||||
Title: issue.Title,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
snapshotJSON, err := json.Marshal(snapshot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal snapshot: %w", err)
|
||||
}
|
||||
|
||||
query := `
|
||||
INSERT INTO compaction_snapshots (issue_id, compaction_level, snapshot_json, created_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`
|
||||
|
||||
_, err = s.db.ExecContext(ctx, query, issue.ID, level, snapshotJSON, snapshot.CreatedAt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert snapshot: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RestoreFromSnapshot restores an issue's content from a snapshot.
|
||||
// Returns the exact original content from the snapshot at the specified level.
|
||||
// Uses a transaction with optimistic concurrency control to prevent race conditions.
|
||||
func (s *SQLiteStorage) RestoreFromSnapshot(ctx context.Context, issueID string, level int) error {
|
||||
if level <= 0 {
|
||||
return fmt.Errorf("invalid level %d; must be >= 1", level)
|
||||
}
|
||||
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
var snapshotJSON []byte
|
||||
err = tx.QueryRowContext(ctx, `
|
||||
SELECT snapshot_json
|
||||
FROM compaction_snapshots
|
||||
WHERE issue_id = ? AND compaction_level = ?
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
`, issueID, level).Scan(&snapshotJSON)
|
||||
if err == sql.ErrNoRows {
|
||||
return fmt.Errorf("no snapshot found for issue %s at level %d", issueID, level)
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query snapshot: %w", err)
|
||||
}
|
||||
|
||||
var snapshot Snapshot
|
||||
if err = json.Unmarshal(snapshotJSON, &snapshot); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal snapshot: %w", err)
|
||||
}
|
||||
|
||||
if snapshot.IssueID != issueID {
|
||||
return fmt.Errorf("snapshot issue_id mismatch: got %s, want %s", snapshot.IssueID, issueID)
|
||||
}
|
||||
|
||||
restoreLevel := snapshot.CompactionLevel - 1
|
||||
if restoreLevel < 0 {
|
||||
return fmt.Errorf("invalid restore level computed: %d", restoreLevel)
|
||||
}
|
||||
|
||||
res, err := tx.ExecContext(ctx, `
|
||||
UPDATE issues
|
||||
SET description = ?,
|
||||
design = ?,
|
||||
notes = ?,
|
||||
acceptance_criteria = ?,
|
||||
title = ?,
|
||||
compaction_level = ?,
|
||||
updated_at = ?
|
||||
WHERE id = ? AND COALESCE(compaction_level, 0) = ?
|
||||
`,
|
||||
snapshot.Description,
|
||||
snapshot.Design,
|
||||
snapshot.Notes,
|
||||
snapshot.AcceptanceCriteria,
|
||||
snapshot.Title,
|
||||
restoreLevel,
|
||||
time.Now().UTC(),
|
||||
issueID,
|
||||
snapshot.CompactionLevel,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to restore issue content: %w", err)
|
||||
}
|
||||
|
||||
rows, _ := res.RowsAffected()
|
||||
if rows == 0 {
|
||||
return fmt.Errorf("restore conflict: current compaction_level changed or issue not found")
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return fmt.Errorf("commit restore tx: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSnapshots returns all snapshots for an issue, ordered by compaction level and creation time.
|
||||
// Returns the latest snapshot for each level.
|
||||
func (s *SQLiteStorage) GetSnapshots(ctx context.Context, issueID string) ([]*Snapshot, error) {
|
||||
query := `
|
||||
SELECT snapshot_json
|
||||
FROM compaction_snapshots
|
||||
WHERE issue_id = ?
|
||||
ORDER BY compaction_level ASC, created_at DESC
|
||||
`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, issueID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query snapshots: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var snapshots []*Snapshot
|
||||
for rows.Next() {
|
||||
var snapshotJSON []byte
|
||||
if err := rows.Scan(&snapshotJSON); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan snapshot: %w", err)
|
||||
}
|
||||
|
||||
var snapshot Snapshot
|
||||
if err := json.Unmarshal(snapshotJSON, &snapshot); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err)
|
||||
}
|
||||
|
||||
snapshots = append(snapshots, &snapshot)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("rows iteration error: %w", err)
|
||||
}
|
||||
|
||||
return snapshots, nil
|
||||
}
|
||||
|
||||
@@ -128,6 +128,18 @@ CREATE TABLE IF NOT EXISTS issue_snapshots (
|
||||
CREATE INDEX IF NOT EXISTS idx_snapshots_issue ON issue_snapshots(issue_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_snapshots_level ON issue_snapshots(compaction_level);
|
||||
|
||||
-- Compaction snapshots table (for restoration)
|
||||
CREATE TABLE IF NOT EXISTS compaction_snapshots (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
issue_id TEXT NOT NULL,
|
||||
compaction_level INTEGER NOT NULL,
|
||||
snapshot_json BLOB NOT NULL,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_comp_snap_issue_level_created ON compaction_snapshots(issue_id, compaction_level, created_at DESC);
|
||||
|
||||
-- Ready work view (with hierarchical blocking)
|
||||
-- Uses recursive CTE to propagate blocking through parent-child hierarchy
|
||||
CREATE VIEW IF NOT EXISTS ready_issues AS
|
||||
|
||||
Reference in New Issue
Block a user