WIP: bd-164 timestamp-only export deduplication (~80% complete)
Implemented content hash-based deduplication to skip exports when only timestamps changed. Core logic complete, needs export_hashes table wiring. Completed: - Added computeIssueContentHash() excluding timestamps - Created shouldSkipExport() logic - Updated export loop to skip timestamp-only changes - Added hash.go with content hashing - Extended Storage interface Remaining: - Complete export_hashes table migration - Add SetExportHash/GetExportHash to interface - Revert content_hash from dirty_issues approach - Wire up hash persistence in export - Testing See bd-164 notes for details. Amp-Thread-ID: https://ampcode.com/threads/T-d70657d1-4433-4f7e-b10a-3fccf8bf17fb Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -2,18 +2,71 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
"github.com/steveyegge/beads/internal/storage/sqlite"
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// computeIssueContentHash computes a SHA256 hash of an issue's content, excluding timestamps.
|
||||
// This is used for detecting timestamp-only changes during export deduplication.
|
||||
func computeIssueContentHash(issue *types.Issue) (string, error) {
|
||||
// Clone issue and zero out timestamps to exclude them from hash
|
||||
normalized := *issue
|
||||
normalized.CreatedAt = time.Time{}
|
||||
normalized.UpdatedAt = time.Time{}
|
||||
|
||||
// Also zero out ClosedAt if present
|
||||
if normalized.ClosedAt != nil {
|
||||
zeroTime := time.Time{}
|
||||
normalized.ClosedAt = &zeroTime
|
||||
}
|
||||
|
||||
// Serialize to JSON
|
||||
data, err := json.Marshal(normalized)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// SHA256 hash
|
||||
hash := sha256.Sum256(data)
|
||||
return hex.EncodeToString(hash[:]), nil
|
||||
}
|
||||
|
||||
// shouldSkipExport checks if an issue should be skipped during export because
|
||||
// it only has timestamp changes (no actual content changes).
|
||||
func shouldSkipExport(ctx context.Context, store storage.Storage, issue *types.Issue) (bool, error) {
|
||||
// Get the stored hash from dirty_issues table
|
||||
storedHash, err := store.GetDirtyIssueHash(ctx, issue.ID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// If no hash stored, we must export (first export or old data)
|
||||
if storedHash == "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Compute current hash
|
||||
currentHash, err := computeIssueContentHash(issue)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// If hashes match, only timestamps changed - skip export
|
||||
return currentHash == storedHash, nil
|
||||
}
|
||||
|
||||
// countIssuesInJSONL counts the number of issues in a JSONL file
|
||||
func countIssuesInJSONL(path string) (int, error) {
|
||||
file, err := os.Open(path)
|
||||
@@ -221,16 +274,35 @@ Output to stdout by default, or use -o flag for file output.`,
|
||||
out = tempFile
|
||||
}
|
||||
|
||||
// Write JSONL
|
||||
// Write JSONL (with timestamp-only deduplication for bd-164)
|
||||
encoder := json.NewEncoder(out)
|
||||
exportedIDs := make([]string, 0, len(issues))
|
||||
skippedCount := 0
|
||||
for _, issue := range issues {
|
||||
// Check if this is only a timestamp change (bd-164)
|
||||
skip, err := shouldSkipExport(ctx, store, issue)
|
||||
if err != nil {
|
||||
// Log warning but continue - don't fail export on hash check errors
|
||||
fmt.Fprintf(os.Stderr, "Warning: failed to check if %s should skip: %v\n", issue.ID, err)
|
||||
skip = false
|
||||
}
|
||||
|
||||
if skip {
|
||||
skippedCount++
|
||||
continue
|
||||
}
|
||||
|
||||
if err := encoder.Encode(issue); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error encoding issue %s: %v\n", issue.ID, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
exportedIDs = append(exportedIDs, issue.ID)
|
||||
}
|
||||
|
||||
// Report skipped issues if any (helps debugging bd-159)
|
||||
if skippedCount > 0 && (output == "" || output == findJSONLPath()) {
|
||||
fmt.Fprintf(os.Stderr, "Skipped %d issue(s) with timestamp-only changes\n", skippedCount)
|
||||
}
|
||||
|
||||
// Only clear dirty issues and auto-flush state if exporting to the default JSONL path
|
||||
// This prevents clearing dirty flags when exporting to custom paths (e.g., bd export -o backup.jsonl)
|
||||
|
||||
@@ -151,7 +151,7 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
|
||||
|
||||
// Mark both issues as dirty for incremental export
|
||||
// (dependencies are exported with each issue, so both need updating)
|
||||
if err := markIssuesDirtyTx(ctx, tx, []string{dep.IssueID, dep.DependsOnID}); err != nil {
|
||||
if err := markIssuesDirtyTx(ctx, tx, s, []string{dep.IssueID, dep.DependsOnID}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -264,7 +264,7 @@ func (s *SQLiteStorage) addDependencyUnchecked(ctx context.Context, dep *types.D
|
||||
}
|
||||
|
||||
// Mark both issues as dirty
|
||||
if err := markIssuesDirtyTx(ctx, tx, []string{dep.IssueID, dep.DependsOnID}); err != nil {
|
||||
if err := markIssuesDirtyTx(ctx, tx, s, []string{dep.IssueID, dep.DependsOnID}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -305,7 +305,7 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
|
||||
}
|
||||
|
||||
// Mark both issues as dirty for incremental export
|
||||
if err := markIssuesDirtyTx(ctx, tx, []string{issueID, dependsOnID}); err != nil {
|
||||
if err := markIssuesDirtyTx(ctx, tx, s, []string{issueID, dependsOnID}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -348,7 +348,7 @@ func (s *SQLiteStorage) removeDependencyIfExists(ctx context.Context, issueID, d
|
||||
}
|
||||
|
||||
// Mark both issues as dirty for incremental export
|
||||
if err := markIssuesDirtyTx(ctx, tx, []string{issueID, dependsOnID}); err != nil {
|
||||
if err := markIssuesDirtyTx(ctx, tx, s, []string{issueID, dependsOnID}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -11,11 +11,22 @@ import (
|
||||
// MarkIssueDirty marks an issue as dirty (needs to be exported to JSONL)
|
||||
// This should be called whenever an issue is created, updated, or has dependencies changed
|
||||
func (s *SQLiteStorage) MarkIssueDirty(ctx context.Context, issueID string) error {
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||
`, issueID, time.Now())
|
||||
// Fetch the issue to compute its content hash
|
||||
issue, err := s.GetIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := computeIssueContentHash(issue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = s.db.ExecContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at, content_hash)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at, content_hash = excluded.content_hash
|
||||
`, issueID, time.Now(), hash)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -34,9 +45,9 @@ func (s *SQLiteStorage) MarkIssuesDirty(ctx context.Context, issueIDs []string)
|
||||
|
||||
now := time.Now()
|
||||
stmt, err := tx.PrepareContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||
INSERT INTO dirty_issues (issue_id, marked_at, content_hash)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at, content_hash = excluded.content_hash
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare statement: %w", err)
|
||||
@@ -44,7 +55,18 @@ func (s *SQLiteStorage) MarkIssuesDirty(ctx context.Context, issueIDs []string)
|
||||
defer func() { _ = stmt.Close() }()
|
||||
|
||||
for _, issueID := range issueIDs {
|
||||
if _, err := stmt.ExecContext(ctx, issueID, now); err != nil {
|
||||
// Fetch issue to compute content hash
|
||||
issue, err := s.GetIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get issue %s: %w", issueID, err)
|
||||
}
|
||||
|
||||
hash, err := computeIssueContentHash(issue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to compute hash for issue %s: %w", issueID, err)
|
||||
}
|
||||
|
||||
if _, err := stmt.ExecContext(ctx, issueID, now, hash); err != nil {
|
||||
return fmt.Errorf("failed to mark issue %s dirty: %w", issueID, err)
|
||||
}
|
||||
}
|
||||
@@ -75,6 +97,27 @@ func (s *SQLiteStorage) GetDirtyIssues(ctx context.Context) ([]string, error) {
|
||||
return issueIDs, rows.Err()
|
||||
}
|
||||
|
||||
// GetDirtyIssueHash returns the stored content hash for a dirty issue, if it exists
|
||||
func (s *SQLiteStorage) GetDirtyIssueHash(ctx context.Context, issueID string) (string, error) {
|
||||
var hash sql.NullString
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
SELECT content_hash FROM dirty_issues WHERE issue_id = ?
|
||||
`, issueID).Scan(&hash)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return "", nil // Issue not dirty
|
||||
}
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get dirty issue hash: %w", err)
|
||||
}
|
||||
|
||||
if !hash.Valid {
|
||||
return "", nil // No hash stored yet
|
||||
}
|
||||
|
||||
return hash.String, nil
|
||||
}
|
||||
|
||||
// ClearDirtyIssues removes all entries from the dirty_issues table
|
||||
// This should be called after a successful JSONL export
|
||||
//
|
||||
@@ -128,16 +171,16 @@ func (s *SQLiteStorage) GetDirtyIssueCount(ctx context.Context) (int, error) {
|
||||
|
||||
// markIssuesDirtyTx marks multiple issues as dirty within an existing transaction
|
||||
// This is a helper for operations that need to mark issues dirty as part of a larger transaction
|
||||
func markIssuesDirtyTx(ctx context.Context, tx *sql.Tx, issueIDs []string) error {
|
||||
func markIssuesDirtyTx(ctx context.Context, tx *sql.Tx, store *SQLiteStorage, issueIDs []string) error {
|
||||
if len(issueIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
stmt, err := tx.PrepareContext(ctx, `
|
||||
INSERT INTO dirty_issues (issue_id, marked_at)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at
|
||||
INSERT INTO dirty_issues (issue_id, marked_at, content_hash)
|
||||
VALUES (?, ?, ?)
|
||||
ON CONFLICT (issue_id) DO UPDATE SET marked_at = excluded.marked_at, content_hash = excluded.content_hash
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare dirty statement: %w", err)
|
||||
@@ -145,7 +188,18 @@ func markIssuesDirtyTx(ctx context.Context, tx *sql.Tx, issueIDs []string) error
|
||||
defer func() { _ = stmt.Close() }()
|
||||
|
||||
for _, issueID := range issueIDs {
|
||||
if _, err := stmt.ExecContext(ctx, issueID, now); err != nil {
|
||||
// Fetch issue to compute content hash
|
||||
issue, err := store.GetIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get issue %s: %w", issueID, err)
|
||||
}
|
||||
|
||||
hash, err := computeIssueContentHash(issue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to compute hash for issue %s: %w", issueID, err)
|
||||
}
|
||||
|
||||
if _, err := stmt.ExecContext(ctx, issueID, now, hash); err != nil {
|
||||
return fmt.Errorf("failed to mark issue %s dirty: %w", issueID, err)
|
||||
}
|
||||
}
|
||||
|
||||
35
internal/storage/sqlite/hash.go
Normal file
35
internal/storage/sqlite/hash.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// computeIssueContentHash computes a SHA256 hash of an issue's content, excluding timestamps.
|
||||
// This is used for detecting timestamp-only changes during export deduplication.
|
||||
func computeIssueContentHash(issue *types.Issue) (string, error) {
|
||||
// Clone issue and zero out timestamps to exclude them from hash
|
||||
normalized := *issue
|
||||
normalized.CreatedAt = time.Time{}
|
||||
normalized.UpdatedAt = time.Time{}
|
||||
|
||||
// Also zero out ClosedAt if present
|
||||
if normalized.ClosedAt != nil {
|
||||
zeroTime := time.Time{}
|
||||
normalized.ClosedAt = &zeroTime
|
||||
}
|
||||
|
||||
// Serialize to JSON
|
||||
data, err := json.Marshal(normalized)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// SHA256 hash
|
||||
hash := sha256.Sum256(data)
|
||||
return hex.EncodeToString(hash[:]), nil
|
||||
}
|
||||
@@ -120,6 +120,14 @@ CREATE TABLE IF NOT EXISTS dirty_issues (
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_dirty_issues_marked_at ON dirty_issues(marked_at);
|
||||
|
||||
-- Tracks content hash of last export for each issue (for timestamp-only dedup, bd-164)
|
||||
CREATE TABLE IF NOT EXISTS export_hashes (
|
||||
issue_id TEXT PRIMARY KEY,
|
||||
content_hash TEXT NOT NULL,
|
||||
exported_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
-- Issue counters table (for atomic ID generation)
|
||||
CREATE TABLE IF NOT EXISTS issue_counters (
|
||||
prefix TEXT PRIMARY KEY,
|
||||
|
||||
@@ -144,6 +144,7 @@ func migrateDirtyIssuesTable(db *sql.DB) error {
|
||||
CREATE TABLE dirty_issues (
|
||||
issue_id TEXT PRIMARY KEY,
|
||||
marked_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
content_hash TEXT,
|
||||
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX idx_dirty_issues_marked_at ON dirty_issues(marked_at);
|
||||
@@ -159,7 +160,25 @@ func migrateDirtyIssuesTable(db *sql.DB) error {
|
||||
return fmt.Errorf("failed to check for dirty_issues table: %w", err)
|
||||
}
|
||||
|
||||
// Table exists, no migration needed
|
||||
// Table exists, check if content_hash column exists (migration for bd-164)
|
||||
var hasContentHash bool
|
||||
err = db.QueryRow(`
|
||||
SELECT COUNT(*) > 0 FROM pragma_table_info('dirty_issues')
|
||||
WHERE name = 'content_hash'
|
||||
`).Scan(&hasContentHash)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check for content_hash column: %w", err)
|
||||
}
|
||||
|
||||
if !hasContentHash {
|
||||
// Add content_hash column to existing table
|
||||
_, err = db.Exec(`ALTER TABLE dirty_issues ADD COLUMN content_hash TEXT`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add content_hash column: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ type Storage interface {
|
||||
|
||||
// Dirty tracking (for incremental JSONL export)
|
||||
GetDirtyIssues(ctx context.Context) ([]string, error)
|
||||
GetDirtyIssueHash(ctx context.Context, issueID string) (string, error) // For timestamp-only dedup (bd-164)
|
||||
ClearDirtyIssues(ctx context.Context) error // WARNING: Race condition (bd-52), use ClearDirtyIssuesByID
|
||||
ClearDirtyIssuesByID(ctx context.Context, issueIDs []string) error
|
||||
|
||||
|
||||
Reference in New Issue
Block a user