Merge pull request #1240 from coffeegoddd/db/import-export
Enable full-fidelity JSONL import/export for Dolt backend
This commit is contained in:
@@ -3,6 +3,7 @@ package sqlite
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
@@ -56,7 +57,7 @@ func (s *SQLiteStorage) AddIssueComment(ctx context.Context, issueID, author, te
|
||||
// Unlike AddIssueComment which uses CURRENT_TIMESTAMP, this method uses the provided
|
||||
// createdAt time from the JSONL file. This prevents timestamp drift during sync cycles.
|
||||
// GH#735: Comment created_at timestamps were being overwritten with current time during import.
|
||||
func (s *SQLiteStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt string) (*types.Comment, error) {
|
||||
func (s *SQLiteStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
|
||||
// Verify issue exists
|
||||
var exists bool
|
||||
err := s.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM issues WHERE id = ?)`, issueID).Scan(&exists)
|
||||
@@ -68,10 +69,11 @@ func (s *SQLiteStorage) ImportIssueComment(ctx context.Context, issueID, author,
|
||||
}
|
||||
|
||||
// Insert comment with provided timestamp
|
||||
createdAtStr := createdAt.UTC().Format(time.RFC3339Nano)
|
||||
result, err := s.db.ExecContext(ctx, `
|
||||
INSERT INTO comments (issue_id, author, text, created_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, author, text, createdAt)
|
||||
`, issueID, author, text, createdAtStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to insert comment: %w", err)
|
||||
}
|
||||
|
||||
115
internal/storage/sqlite/import_tx.go
Normal file
115
internal/storage/sqlite/import_tx.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package sqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
|
||||
// CreateIssueImport creates an issue inside an existing sqlite transaction, optionally skipping
|
||||
// prefix validation. This is used by JSONL import to support multi-repo mode (GH#686).
|
||||
func (t *sqliteTxStorage) CreateIssueImport(ctx context.Context, issue *types.Issue, actor string, skipPrefixValidation bool) error {
|
||||
// Fetch custom statuses and types for validation
|
||||
customStatuses, err := t.GetCustomStatuses(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get custom statuses: %w", err)
|
||||
}
|
||||
customTypes, err := t.GetCustomTypes(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get custom types: %w", err)
|
||||
}
|
||||
|
||||
// Set timestamps
|
||||
now := time.Now()
|
||||
if issue.CreatedAt.IsZero() {
|
||||
issue.CreatedAt = now
|
||||
}
|
||||
if issue.UpdatedAt.IsZero() {
|
||||
issue.UpdatedAt = now
|
||||
}
|
||||
|
||||
// Defensive fix for closed_at invariant
|
||||
if issue.Status == types.StatusClosed && issue.ClosedAt == nil {
|
||||
maxTime := issue.CreatedAt
|
||||
if issue.UpdatedAt.After(maxTime) {
|
||||
maxTime = issue.UpdatedAt
|
||||
}
|
||||
closedAt := maxTime.Add(time.Second)
|
||||
issue.ClosedAt = &closedAt
|
||||
}
|
||||
// Defensive fix for tombstone invariant
|
||||
if issue.Status == types.StatusTombstone && issue.DeletedAt == nil {
|
||||
maxTime := issue.CreatedAt
|
||||
if issue.UpdatedAt.After(maxTime) {
|
||||
maxTime = issue.UpdatedAt
|
||||
}
|
||||
deletedAt := maxTime.Add(time.Second)
|
||||
issue.DeletedAt = &deletedAt
|
||||
}
|
||||
|
||||
// Validate issue before creating
|
||||
if err := issue.ValidateWithCustom(customStatuses, customTypes); err != nil {
|
||||
return fmt.Errorf("validation failed: %w", err)
|
||||
}
|
||||
|
||||
// Compute content hash
|
||||
if issue.ContentHash == "" {
|
||||
issue.ContentHash = issue.ComputeContentHash()
|
||||
}
|
||||
|
||||
// Get configured prefix for validation and ID generation behavior
|
||||
var configPrefix string
|
||||
err = t.conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&configPrefix)
|
||||
if err == sql.ErrNoRows || configPrefix == "" {
|
||||
return fmt.Errorf("database not initialized: issue_prefix config is missing (run 'bd init --prefix <prefix>' first)")
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to get config: %w", err)
|
||||
}
|
||||
|
||||
prefix := configPrefix
|
||||
if issue.IDPrefix != "" {
|
||||
prefix = configPrefix + "-" + issue.IDPrefix
|
||||
}
|
||||
|
||||
if issue.ID == "" {
|
||||
// Import path expects IDs, but be defensive and generate if missing.
|
||||
generatedID, err := GenerateIssueID(ctx, t.conn, prefix, issue, actor)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate issue ID: %w", err)
|
||||
}
|
||||
issue.ID = generatedID
|
||||
} else if !skipPrefixValidation {
|
||||
if err := ValidateIssueIDPrefix(issue.ID, prefix); err != nil {
|
||||
return fmt.Errorf("failed to validate issue ID prefix: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure parent exists for hierarchical IDs (importer should have ensured / resurrected).
|
||||
if isHierarchical, parentID := IsHierarchicalID(issue.ID); isHierarchical {
|
||||
var parentCount int
|
||||
if err := t.conn.QueryRowContext(ctx, `SELECT COUNT(*) FROM issues WHERE id = ?`, parentID).Scan(&parentCount); err != nil {
|
||||
return fmt.Errorf("failed to check parent existence: %w", err)
|
||||
}
|
||||
if parentCount == 0 {
|
||||
return fmt.Errorf("parent issue %s does not exist", parentID)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert issue (strict)
|
||||
if err := insertIssueStrict(ctx, t.conn, issue); err != nil {
|
||||
return fmt.Errorf("failed to insert issue: %w", err)
|
||||
}
|
||||
// Record event
|
||||
if err := recordCreatedEvent(ctx, t.conn, issue, actor); err != nil {
|
||||
return fmt.Errorf("failed to record creation event: %w", err)
|
||||
}
|
||||
// Mark dirty
|
||||
if err := markDirty(ctx, t.conn, issue.ID); err != nil {
|
||||
return fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -824,6 +824,37 @@ func (t *sqliteTxStorage) AddDependency(ctx context.Context, dep *types.Dependen
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDependencyRecords retrieves dependency records for an issue within the transaction.
|
||||
func (t *sqliteTxStorage) GetDependencyRecords(ctx context.Context, issueID string) ([]*types.Dependency, error) {
|
||||
rows, err := t.conn.QueryContext(ctx, `
|
||||
SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
|
||||
FROM dependencies
|
||||
WHERE issue_id = ?
|
||||
`, issueID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query dependencies: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
var deps []*types.Dependency
|
||||
for rows.Next() {
|
||||
var d types.Dependency
|
||||
var metadata sql.NullString
|
||||
var threadID sql.NullString
|
||||
if err := rows.Scan(&d.IssueID, &d.DependsOnID, &d.Type, &d.CreatedAt, &d.CreatedBy, &metadata, &threadID); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan dependency: %w", err)
|
||||
}
|
||||
if metadata.Valid {
|
||||
d.Metadata = metadata.String
|
||||
}
|
||||
if threadID.Valid {
|
||||
d.ThreadID = threadID.String
|
||||
}
|
||||
deps = append(deps, &d)
|
||||
}
|
||||
return deps, rows.Err()
|
||||
}
|
||||
|
||||
// RemoveDependency removes a dependency within the transaction.
|
||||
func (t *sqliteTxStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
|
||||
// First, check what type of dependency is being removed
|
||||
@@ -916,6 +947,11 @@ func (t *sqliteTxStorage) AddLabel(ctx context.Context, issueID, label, actor st
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLabels retrieves labels for an issue within the transaction.
|
||||
func (t *sqliteTxStorage) GetLabels(ctx context.Context, issueID string) ([]string, error) {
|
||||
return t.getLabels(ctx, issueID)
|
||||
}
|
||||
|
||||
// RemoveLabel removes a label from an issue within the transaction.
|
||||
func (t *sqliteTxStorage) RemoveLabel(ctx context.Context, issueID, label, actor string) error {
|
||||
result, err := t.conn.ExecContext(ctx, `
|
||||
@@ -1063,6 +1099,68 @@ func (t *sqliteTxStorage) AddComment(ctx context.Context, issueID, actor, commen
|
||||
return nil
|
||||
}
|
||||
|
||||
// ImportIssueComment adds a structured comment during import, preserving the original timestamp.
|
||||
func (t *sqliteTxStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
|
||||
// Verify issue exists
|
||||
existing, err := t.GetIssue(ctx, issueID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to check issue existence: %w", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return nil, fmt.Errorf("issue %s not found", issueID)
|
||||
}
|
||||
|
||||
createdAtStr := createdAt.UTC().Format(time.RFC3339Nano)
|
||||
res, err := t.conn.ExecContext(ctx, `
|
||||
INSERT INTO comments (issue_id, author, text, created_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`, issueID, author, text, createdAtStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to insert comment: %w", err)
|
||||
}
|
||||
commentID, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get comment ID: %w", err)
|
||||
}
|
||||
|
||||
// Mark issue dirty
|
||||
if err := markDirty(ctx, t.conn, issueID); err != nil {
|
||||
return nil, fmt.Errorf("failed to mark issue dirty: %w", err)
|
||||
}
|
||||
|
||||
return &types.Comment{
|
||||
ID: commentID,
|
||||
IssueID: issueID,
|
||||
Author: author,
|
||||
Text: text,
|
||||
CreatedAt: createdAt.UTC(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetIssueComments retrieves structured comments for an issue within the transaction.
|
||||
func (t *sqliteTxStorage) GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error) {
|
||||
rows, err := t.conn.QueryContext(ctx, `
|
||||
SELECT id, issue_id, author, text, created_at
|
||||
FROM comments
|
||||
WHERE issue_id = ?
|
||||
ORDER BY created_at ASC
|
||||
`, issueID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query comments: %w", err)
|
||||
}
|
||||
defer func() { _ = rows.Close() }()
|
||||
|
||||
var comments []*types.Comment
|
||||
for rows.Next() {
|
||||
var c types.Comment
|
||||
if err := rows.Scan(&c.ID, &c.IssueID, &c.Author, &c.Text, &c.CreatedAt); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan comment: %w", err)
|
||||
}
|
||||
comments = append(comments, &c)
|
||||
}
|
||||
return comments, rows.Err()
|
||||
}
|
||||
|
||||
// SearchIssues finds issues matching query and filters within the transaction.
|
||||
// This enables read-your-writes semantics for searching within a transaction.
|
||||
func (t *sqliteTxStorage) SearchIssues(ctx context.Context, query string, filter types.IssueFilter) ([]*types.Issue, error) {
|
||||
|
||||
Reference in New Issue
Block a user