- Add Compactor with CompactTier1 and CompactTier1Batch methods - Single issue and batch compaction with 5 concurrent workers - Dry-run mode for testing without API calls - Smart size checking: keeps original if summary is longer - Improved Haiku prompts to emphasize compression - Add ApplyCompaction method for setting compaction metadata - Comprehensive tests including API integration tests - All tests passing
474 lines
13 KiB
Go
474 lines
13 KiB
Go
package sqlite
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/steveyegge/beads/internal/types"
|
|
)
|
|
|
|
// CompactionCandidate represents an issue eligible for compaction
|
|
type CompactionCandidate struct {
|
|
IssueID string
|
|
ClosedAt time.Time
|
|
OriginalSize int
|
|
EstimatedSize int
|
|
DependentCount int
|
|
}
|
|
|
|
// Snapshot represents a saved version of issue content before compaction
|
|
type Snapshot struct {
|
|
IssueID string `json:"issue_id"`
|
|
CompactionLevel int `json:"compaction_level"`
|
|
Description string `json:"description"`
|
|
Design string `json:"design"`
|
|
Notes string `json:"notes"`
|
|
AcceptanceCriteria string `json:"acceptance_criteria"`
|
|
Title string `json:"title"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
// GetTier1Candidates returns issues eligible for Tier 1 compaction.
|
|
// Criteria:
|
|
// - Status = closed
|
|
// - Closed for at least compact_tier1_days
|
|
// - No open dependents within compact_tier1_dep_levels depth
|
|
// - Not already compacted (compaction_level = 0)
|
|
func (s *SQLiteStorage) GetTier1Candidates(ctx context.Context) ([]*CompactionCandidate, error) {
|
|
// Get configuration
|
|
daysStr, err := s.GetConfig(ctx, "compact_tier1_days")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get compact_tier1_days: %w", err)
|
|
}
|
|
if daysStr == "" {
|
|
daysStr = "30"
|
|
}
|
|
|
|
depthStr, err := s.GetConfig(ctx, "compact_tier1_dep_levels")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get compact_tier1_dep_levels: %w", err)
|
|
}
|
|
if depthStr == "" {
|
|
depthStr = "2"
|
|
}
|
|
|
|
query := `
|
|
WITH RECURSIVE
|
|
-- Find all issues that depend on (are blocked by) other issues
|
|
dependent_tree AS (
|
|
-- Base case: direct dependents
|
|
SELECT
|
|
d.depends_on_id as issue_id,
|
|
i.id as dependent_id,
|
|
i.status as dependent_status,
|
|
0 as depth
|
|
FROM dependencies d
|
|
JOIN issues i ON d.issue_id = i.id
|
|
WHERE d.type = 'blocks'
|
|
|
|
UNION ALL
|
|
|
|
-- Recursive case: parent-child relationships
|
|
SELECT
|
|
dt.issue_id,
|
|
i.id as dependent_id,
|
|
i.status as dependent_status,
|
|
dt.depth + 1
|
|
FROM dependent_tree dt
|
|
JOIN dependencies d ON d.depends_on_id = dt.dependent_id
|
|
JOIN issues i ON d.issue_id = i.id
|
|
WHERE d.type = 'parent-child'
|
|
AND dt.depth < ?
|
|
)
|
|
SELECT
|
|
i.id,
|
|
i.closed_at,
|
|
COALESCE(i.original_size, LENGTH(i.description) + LENGTH(i.design) + LENGTH(i.notes) + LENGTH(i.acceptance_criteria)) as original_size,
|
|
0 as estimated_size,
|
|
COUNT(DISTINCT dt.dependent_id) as dependent_count
|
|
FROM issues i
|
|
LEFT JOIN dependent_tree dt ON i.id = dt.issue_id
|
|
AND dt.dependent_status IN ('open', 'in_progress', 'blocked')
|
|
AND dt.depth <= ?
|
|
WHERE i.status = 'closed'
|
|
AND i.closed_at IS NOT NULL
|
|
AND i.closed_at <= datetime('now', '-' || CAST(? AS INTEGER) || ' days')
|
|
AND COALESCE(i.compaction_level, 0) = 0
|
|
AND dt.dependent_id IS NULL -- No open dependents
|
|
GROUP BY i.id
|
|
ORDER BY i.closed_at ASC
|
|
`
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, depthStr, depthStr, daysStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query tier1 candidates: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var candidates []*CompactionCandidate
|
|
for rows.Next() {
|
|
var c CompactionCandidate
|
|
if err := rows.Scan(&c.IssueID, &c.ClosedAt, &c.OriginalSize, &c.EstimatedSize, &c.DependentCount); err != nil {
|
|
return nil, fmt.Errorf("failed to scan candidate: %w", err)
|
|
}
|
|
candidates = append(candidates, &c)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("rows iteration error: %w", err)
|
|
}
|
|
|
|
return candidates, nil
|
|
}
|
|
|
|
// GetTier2Candidates returns issues eligible for Tier 2 compaction.
|
|
// Criteria:
|
|
// - Status = closed
|
|
// - Closed for at least compact_tier2_days
|
|
// - No open dependents within compact_tier2_dep_levels depth
|
|
// - Already at compaction_level = 1
|
|
// - Either has many commits (compact_tier2_commits) or many dependent issues
|
|
func (s *SQLiteStorage) GetTier2Candidates(ctx context.Context) ([]*CompactionCandidate, error) {
|
|
// Get configuration
|
|
daysStr, err := s.GetConfig(ctx, "compact_tier2_days")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get compact_tier2_days: %w", err)
|
|
}
|
|
if daysStr == "" {
|
|
daysStr = "90"
|
|
}
|
|
|
|
depthStr, err := s.GetConfig(ctx, "compact_tier2_dep_levels")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get compact_tier2_dep_levels: %w", err)
|
|
}
|
|
if depthStr == "" {
|
|
depthStr = "5"
|
|
}
|
|
|
|
commitsStr, err := s.GetConfig(ctx, "compact_tier2_commits")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get compact_tier2_commits: %w", err)
|
|
}
|
|
if commitsStr == "" {
|
|
commitsStr = "100"
|
|
}
|
|
|
|
query := `
|
|
WITH event_counts AS (
|
|
SELECT issue_id, COUNT(*) as event_count
|
|
FROM events
|
|
GROUP BY issue_id
|
|
)
|
|
SELECT
|
|
i.id,
|
|
i.closed_at,
|
|
i.original_size,
|
|
0 as estimated_size,
|
|
COALESCE(ec.event_count, 0) as dependent_count
|
|
FROM issues i
|
|
LEFT JOIN event_counts ec ON i.id = ec.issue_id
|
|
WHERE i.status = 'closed'
|
|
AND i.closed_at IS NOT NULL
|
|
AND i.closed_at <= datetime('now', '-' || CAST(? AS INTEGER) || ' days')
|
|
AND i.compaction_level = 1
|
|
AND COALESCE(ec.event_count, 0) >= CAST(? AS INTEGER)
|
|
AND NOT EXISTS (
|
|
-- Check for open dependents
|
|
SELECT 1 FROM dependencies d
|
|
JOIN issues dep ON d.issue_id = dep.id
|
|
WHERE d.depends_on_id = i.id
|
|
AND d.type = 'blocks'
|
|
AND dep.status IN ('open', 'in_progress', 'blocked')
|
|
)
|
|
ORDER BY i.closed_at ASC
|
|
`
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, daysStr, commitsStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query tier2 candidates: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var candidates []*CompactionCandidate
|
|
for rows.Next() {
|
|
var c CompactionCandidate
|
|
if err := rows.Scan(&c.IssueID, &c.ClosedAt, &c.OriginalSize, &c.EstimatedSize, &c.DependentCount); err != nil {
|
|
return nil, fmt.Errorf("failed to scan candidate: %w", err)
|
|
}
|
|
candidates = append(candidates, &c)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("rows iteration error: %w", err)
|
|
}
|
|
|
|
return candidates, nil
|
|
}
|
|
|
|
// CheckEligibility checks if a specific issue is eligible for compaction at the given tier.
|
|
// Returns (eligible, reason, error).
|
|
// If not eligible, reason explains why.
|
|
func (s *SQLiteStorage) CheckEligibility(ctx context.Context, issueID string, tier int) (bool, string, error) {
|
|
// Get the issue
|
|
var status string
|
|
var closedAt sql.NullTime
|
|
var compactionLevel int
|
|
|
|
err := s.db.QueryRowContext(ctx, `
|
|
SELECT status, closed_at, COALESCE(compaction_level, 0)
|
|
FROM issues
|
|
WHERE id = ?
|
|
`, issueID).Scan(&status, &closedAt, &compactionLevel)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return false, "issue not found", nil
|
|
}
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("failed to get issue: %w", err)
|
|
}
|
|
|
|
// Check basic requirements
|
|
if status != "closed" {
|
|
return false, "issue is not closed", nil
|
|
}
|
|
|
|
if !closedAt.Valid {
|
|
return false, "issue has no closed_at timestamp", nil
|
|
}
|
|
|
|
if tier == 1 {
|
|
if compactionLevel != 0 {
|
|
return false, "issue is already compacted", nil
|
|
}
|
|
|
|
// Check if closed long enough
|
|
daysStr, err := s.GetConfig(ctx, "compact_tier1_days")
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("failed to get compact_tier1_days: %w", err)
|
|
}
|
|
if daysStr == "" {
|
|
daysStr = "30"
|
|
}
|
|
|
|
// Check if it appears in tier1 candidates
|
|
candidates, err := s.GetTier1Candidates(ctx)
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("failed to get tier1 candidates: %w", err)
|
|
}
|
|
|
|
for _, c := range candidates {
|
|
if c.IssueID == issueID {
|
|
return true, "", nil
|
|
}
|
|
}
|
|
|
|
return false, "issue has open dependents or not closed long enough", nil
|
|
|
|
} else if tier == 2 {
|
|
if compactionLevel != 1 {
|
|
return false, "issue must be at compaction level 1 for tier 2", nil
|
|
}
|
|
|
|
// Check if it appears in tier2 candidates
|
|
candidates, err := s.GetTier2Candidates(ctx)
|
|
if err != nil {
|
|
return false, "", fmt.Errorf("failed to get tier2 candidates: %w", err)
|
|
}
|
|
|
|
for _, c := range candidates {
|
|
if c.IssueID == issueID {
|
|
return true, "", nil
|
|
}
|
|
}
|
|
|
|
return false, "issue has open dependents, not closed long enough, or insufficient events", nil
|
|
}
|
|
|
|
return false, fmt.Sprintf("invalid tier: %d", tier), nil
|
|
}
|
|
|
|
// CreateSnapshot creates a snapshot of the issue's content before compaction.
|
|
// The snapshot includes all text fields and is stored as JSON.
|
|
// Multiple snapshots can exist per issue (one per compaction level).
|
|
// NOTE: This should be called within the same transaction as the compaction operation.
|
|
func (s *SQLiteStorage) CreateSnapshot(ctx context.Context, issue *types.Issue, level int) error {
|
|
if level <= 0 {
|
|
return fmt.Errorf("invalid compaction level %d; must be >= 1", level)
|
|
}
|
|
|
|
snapshot := Snapshot{
|
|
IssueID: issue.ID,
|
|
CompactionLevel: level,
|
|
Description: issue.Description,
|
|
Design: issue.Design,
|
|
Notes: issue.Notes,
|
|
AcceptanceCriteria: issue.AcceptanceCriteria,
|
|
Title: issue.Title,
|
|
CreatedAt: time.Now().UTC(),
|
|
}
|
|
|
|
snapshotJSON, err := json.Marshal(snapshot)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal snapshot: %w", err)
|
|
}
|
|
|
|
query := `
|
|
INSERT INTO compaction_snapshots (issue_id, compaction_level, snapshot_json, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
`
|
|
|
|
_, err = s.db.ExecContext(ctx, query, issue.ID, level, snapshotJSON, snapshot.CreatedAt)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert snapshot: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RestoreFromSnapshot restores an issue's content from a snapshot.
|
|
// Returns the exact original content from the snapshot at the specified level.
|
|
// Uses a transaction with optimistic concurrency control to prevent race conditions.
|
|
func (s *SQLiteStorage) RestoreFromSnapshot(ctx context.Context, issueID string, level int) error {
|
|
if level <= 0 {
|
|
return fmt.Errorf("invalid level %d; must be >= 1", level)
|
|
}
|
|
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
}
|
|
}()
|
|
|
|
var snapshotJSON []byte
|
|
err = tx.QueryRowContext(ctx, `
|
|
SELECT snapshot_json
|
|
FROM compaction_snapshots
|
|
WHERE issue_id = ? AND compaction_level = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
`, issueID, level).Scan(&snapshotJSON)
|
|
if err == sql.ErrNoRows {
|
|
return fmt.Errorf("no snapshot found for issue %s at level %d", issueID, level)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to query snapshot: %w", err)
|
|
}
|
|
|
|
var snapshot Snapshot
|
|
if err = json.Unmarshal(snapshotJSON, &snapshot); err != nil {
|
|
return fmt.Errorf("failed to unmarshal snapshot: %w", err)
|
|
}
|
|
|
|
if snapshot.IssueID != issueID {
|
|
return fmt.Errorf("snapshot issue_id mismatch: got %s, want %s", snapshot.IssueID, issueID)
|
|
}
|
|
|
|
restoreLevel := snapshot.CompactionLevel - 1
|
|
if restoreLevel < 0 {
|
|
return fmt.Errorf("invalid restore level computed: %d", restoreLevel)
|
|
}
|
|
|
|
res, err := tx.ExecContext(ctx, `
|
|
UPDATE issues
|
|
SET description = ?,
|
|
design = ?,
|
|
notes = ?,
|
|
acceptance_criteria = ?,
|
|
title = ?,
|
|
compaction_level = ?,
|
|
updated_at = ?
|
|
WHERE id = ? AND COALESCE(compaction_level, 0) = ?
|
|
`,
|
|
snapshot.Description,
|
|
snapshot.Design,
|
|
snapshot.Notes,
|
|
snapshot.AcceptanceCriteria,
|
|
snapshot.Title,
|
|
restoreLevel,
|
|
time.Now().UTC(),
|
|
issueID,
|
|
snapshot.CompactionLevel,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to restore issue content: %w", err)
|
|
}
|
|
|
|
rows, _ := res.RowsAffected()
|
|
if rows == 0 {
|
|
return fmt.Errorf("restore conflict: current compaction_level changed or issue not found")
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
return fmt.Errorf("commit restore tx: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetSnapshots returns all snapshots for an issue, ordered by compaction level and creation time.
|
|
// Returns the latest snapshot for each level.
|
|
func (s *SQLiteStorage) GetSnapshots(ctx context.Context, issueID string) ([]*Snapshot, error) {
|
|
query := `
|
|
SELECT snapshot_json
|
|
FROM compaction_snapshots
|
|
WHERE issue_id = ?
|
|
ORDER BY compaction_level ASC, created_at DESC
|
|
`
|
|
|
|
rows, err := s.db.QueryContext(ctx, query, issueID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query snapshots: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var snapshots []*Snapshot
|
|
for rows.Next() {
|
|
var snapshotJSON []byte
|
|
if err := rows.Scan(&snapshotJSON); err != nil {
|
|
return nil, fmt.Errorf("failed to scan snapshot: %w", err)
|
|
}
|
|
|
|
var snapshot Snapshot
|
|
if err := json.Unmarshal(snapshotJSON, &snapshot); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err)
|
|
}
|
|
|
|
snapshots = append(snapshots, &snapshot)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("rows iteration error: %w", err)
|
|
}
|
|
|
|
return snapshots, nil
|
|
}
|
|
|
|
// ApplyCompaction updates the compaction metadata for an issue after successfully compacting it.
|
|
// This sets compaction_level, compacted_at, and original_size fields.
|
|
func (s *SQLiteStorage) ApplyCompaction(ctx context.Context, issueID string, level int, originalSize int) error {
|
|
now := time.Now().UTC()
|
|
|
|
_, err := s.db.ExecContext(ctx, `
|
|
UPDATE issues
|
|
SET compaction_level = ?,
|
|
compacted_at = ?,
|
|
original_size = ?,
|
|
updated_at = ?
|
|
WHERE id = ?
|
|
`, level, now, originalSize, now, issueID)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to apply compaction metadata: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|