/{cmd,docs,internal}: support import export for dolt backends

This commit is contained in:
Test
2026-01-21 13:13:24 -08:00
parent 4a0f4abc70
commit b849f598d7
23 changed files with 1837 additions and 226 deletions

View File

@@ -263,14 +263,17 @@ func (s *DoltStore) UpdatePeerLastSync(ctx context.Context, name string) error {
// The caller must hold federationEnvMutex.
func setFederationCredentials(username, password string) func() {
if username != "" {
os.Setenv("DOLT_REMOTE_USER", username)
// Best-effort: failures here should not crash the caller.
_ = os.Setenv("DOLT_REMOTE_USER", username)
}
if password != "" {
os.Setenv("DOLT_REMOTE_PASSWORD", password)
// Best-effort: failures here should not crash the caller.
_ = os.Setenv("DOLT_REMOTE_PASSWORD", password)
}
return func() {
os.Unsetenv("DOLT_REMOTE_USER")
os.Unsetenv("DOLT_REMOTE_PASSWORD")
// Best-effort cleanup.
_ = os.Unsetenv("DOLT_REMOTE_USER")
_ = os.Unsetenv("DOLT_REMOTE_PASSWORD")
}
}

View File

@@ -65,10 +65,26 @@ func (s *DoltStore) GetEvents(ctx context.Context, issueID string, limit int) ([
// AddIssueComment adds a comment to an issue (structured comment)
func (s *DoltStore) AddIssueComment(ctx context.Context, issueID, author, text string) (*types.Comment, error) {
return s.ImportIssueComment(ctx, issueID, author, text, time.Now().UTC())
}
// ImportIssueComment adds a comment during import, preserving the original timestamp.
// This prevents comment timestamp drift across JSONL sync cycles.
func (s *DoltStore) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
// Verify issue exists
var exists bool
if err := s.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM issues WHERE id = ?)`, issueID).Scan(&exists); err != nil {
return nil, fmt.Errorf("failed to check issue existence: %w", err)
}
if !exists {
return nil, fmt.Errorf("issue %s not found", issueID)
}
createdAt = createdAt.UTC()
result, err := s.db.ExecContext(ctx, `
INSERT INTO comments (issue_id, author, text, created_at)
VALUES (?, ?, ?, ?)
`, issueID, author, text, time.Now().UTC())
`, issueID, author, text, createdAt)
if err != nil {
return nil, fmt.Errorf("failed to add comment: %w", err)
}
@@ -78,12 +94,21 @@ func (s *DoltStore) AddIssueComment(ctx context.Context, issueID, author, text s
return nil, fmt.Errorf("failed to get comment id: %w", err)
}
// Mark issue dirty for incremental JSONL export
if _, err := s.db.ExecContext(ctx, `
INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE marked_at = VALUES(marked_at)
`, issueID, time.Now().UTC()); err != nil {
return nil, fmt.Errorf("failed to mark issue dirty: %w", err)
}
return &types.Comment{
ID: id,
IssueID: issueID,
Author: author,
Text: text,
CreatedAt: time.Now().UTC(),
CreatedAt: createdAt,
}, nil
}

View File

@@ -103,6 +103,7 @@ func (s *Server) Start(ctx context.Context) error {
}
// Create command
// #nosec G204 -- dolt binary is fixed; args are derived from internal config.
s.cmd = exec.CommandContext(ctx, "dolt", args...)
s.cmd.Dir = s.cfg.DataDir
@@ -272,6 +273,7 @@ func (s *Server) waitForReady(ctx context.Context) error {
// GetRunningServerPID returns the PID of a running server from the PID file, or 0 if not running
func GetRunningServerPID(dataDir string) int {
pidFile := filepath.Join(dataDir, "dolt-server.pid")
// #nosec G304 -- pidFile is derived from internal dataDir.
data, err := os.ReadFile(pidFile)
if err != nil {
return 0

View File

@@ -17,6 +17,12 @@ type doltTransaction struct {
store *DoltStore
}
// CreateIssueImport is the import-friendly issue creation hook.
// Dolt does not enforce prefix validation at the storage layer, so this delegates to CreateIssue.
func (t *doltTransaction) CreateIssueImport(ctx context.Context, issue *types.Issue, actor string, skipPrefixValidation bool) error {
return t.CreateIssue(ctx, issue, actor)
}
// RunInTransaction executes a function within a database transaction
func (s *DoltStore) RunInTransaction(ctx context.Context, fn func(tx storage.Transaction) error) error {
sqlTx, err := s.db.BeginTx(ctx, nil)
@@ -169,6 +175,36 @@ func (t *doltTransaction) AddDependency(ctx context.Context, dep *types.Dependen
return err
}
func (t *doltTransaction) GetDependencyRecords(ctx context.Context, issueID string) ([]*types.Dependency, error) {
rows, err := t.tx.QueryContext(ctx, `
SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
FROM dependencies
WHERE issue_id = ?
`, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
var deps []*types.Dependency
for rows.Next() {
var d types.Dependency
var metadata sql.NullString
var threadID sql.NullString
if err := rows.Scan(&d.IssueID, &d.DependsOnID, &d.Type, &d.CreatedAt, &d.CreatedBy, &metadata, &threadID); err != nil {
return nil, err
}
if metadata.Valid {
d.Metadata = metadata.String
}
if threadID.Valid {
d.ThreadID = threadID.String
}
deps = append(deps, &d)
}
return deps, rows.Err()
}
// RemoveDependency removes a dependency within the transaction
func (t *doltTransaction) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
_, err := t.tx.ExecContext(ctx, `
@@ -185,6 +221,23 @@ func (t *doltTransaction) AddLabel(ctx context.Context, issueID, label, actor st
return err
}
func (t *doltTransaction) GetLabels(ctx context.Context, issueID string) ([]string, error) {
rows, err := t.tx.QueryContext(ctx, `SELECT label FROM labels WHERE issue_id = ? ORDER BY label`, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
var labels []string
for rows.Next() {
var l string
if err := rows.Scan(&l); err != nil {
return nil, err
}
labels = append(labels, l)
}
return labels, rows.Err()
}
// RemoveLabel removes a label within the transaction
func (t *doltTransaction) RemoveLabel(ctx context.Context, issueID, label, actor string) error {
_, err := t.tx.ExecContext(ctx, `
@@ -231,6 +284,63 @@ func (t *doltTransaction) GetMetadata(ctx context.Context, key string) (string,
return value, err
}
func (t *doltTransaction) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
// Verify issue exists in tx
iss, err := t.GetIssue(ctx, issueID)
if err != nil {
return nil, err
}
if iss == nil {
return nil, fmt.Errorf("issue %s not found", issueID)
}
createdAt = createdAt.UTC()
res, err := t.tx.ExecContext(ctx, `
INSERT INTO comments (issue_id, author, text, created_at)
VALUES (?, ?, ?, ?)
`, issueID, author, text, createdAt)
if err != nil {
return nil, fmt.Errorf("failed to add comment: %w", err)
}
id, err := res.LastInsertId()
if err != nil {
return nil, fmt.Errorf("failed to get comment id: %w", err)
}
// mark dirty in tx
if _, err := t.tx.ExecContext(ctx, `
INSERT INTO dirty_issues (issue_id, marked_at)
VALUES (?, ?)
ON DUPLICATE KEY UPDATE marked_at = VALUES(marked_at)
`, issueID, time.Now().UTC()); err != nil {
return nil, fmt.Errorf("failed to mark issue dirty: %w", err)
}
return &types.Comment{ID: id, IssueID: issueID, Author: author, Text: text, CreatedAt: createdAt}, nil
}
func (t *doltTransaction) GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error) {
rows, err := t.tx.QueryContext(ctx, `
SELECT id, issue_id, author, text, created_at
FROM comments
WHERE issue_id = ?
ORDER BY created_at ASC
`, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
var comments []*types.Comment
for rows.Next() {
var c types.Comment
if err := rows.Scan(&c.ID, &c.IssueID, &c.Author, &c.Text, &c.CreatedAt); err != nil {
return nil, err
}
comments = append(comments, &c)
}
return comments, rows.Err()
}
// AddComment adds a comment within the transaction
func (t *doltTransaction) AddComment(ctx context.Context, issueID, actor, comment string) error {
_, err := t.tx.ExecContext(ctx, `

View File

@@ -1457,6 +1457,24 @@ func (m *MemoryStorage) AddIssueComment(ctx context.Context, issueID, author, te
return comment, nil
}
func (m *MemoryStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
m.mu.Lock()
defer m.mu.Unlock()
comment := &types.Comment{
ID: int64(len(m.comments[issueID]) + 1),
IssueID: issueID,
Author: author,
Text: text,
CreatedAt: createdAt,
}
m.comments[issueID] = append(m.comments[issueID], comment)
m.dirty[issueID] = true
return comment, nil
}
func (m *MemoryStorage) GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error) {
m.mu.RLock()
defer m.mu.RUnlock()

View File

@@ -3,6 +3,7 @@ package sqlite
import (
"context"
"fmt"
"time"
"github.com/steveyegge/beads/internal/types"
)
@@ -56,7 +57,7 @@ func (s *SQLiteStorage) AddIssueComment(ctx context.Context, issueID, author, te
// Unlike AddIssueComment which uses CURRENT_TIMESTAMP, this method uses the provided
// createdAt time from the JSONL file. This prevents timestamp drift during sync cycles.
// GH#735: Comment created_at timestamps were being overwritten with current time during import.
func (s *SQLiteStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt string) (*types.Comment, error) {
func (s *SQLiteStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
// Verify issue exists
var exists bool
err := s.db.QueryRowContext(ctx, `SELECT EXISTS(SELECT 1 FROM issues WHERE id = ?)`, issueID).Scan(&exists)
@@ -68,10 +69,11 @@ func (s *SQLiteStorage) ImportIssueComment(ctx context.Context, issueID, author,
}
// Insert comment with provided timestamp
createdAtStr := createdAt.UTC().Format(time.RFC3339Nano)
result, err := s.db.ExecContext(ctx, `
INSERT INTO comments (issue_id, author, text, created_at)
VALUES (?, ?, ?, ?)
`, issueID, author, text, createdAt)
`, issueID, author, text, createdAtStr)
if err != nil {
return nil, fmt.Errorf("failed to insert comment: %w", err)
}

View File

@@ -0,0 +1,115 @@
package sqlite
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/steveyegge/beads/internal/types"
)
// CreateIssueImport creates an issue inside an existing sqlite transaction, optionally skipping
// prefix validation. This is used by JSONL import to support multi-repo mode (GH#686).
func (t *sqliteTxStorage) CreateIssueImport(ctx context.Context, issue *types.Issue, actor string, skipPrefixValidation bool) error {
// Fetch custom statuses and types for validation
customStatuses, err := t.GetCustomStatuses(ctx)
if err != nil {
return fmt.Errorf("failed to get custom statuses: %w", err)
}
customTypes, err := t.GetCustomTypes(ctx)
if err != nil {
return fmt.Errorf("failed to get custom types: %w", err)
}
// Set timestamps
now := time.Now()
if issue.CreatedAt.IsZero() {
issue.CreatedAt = now
}
if issue.UpdatedAt.IsZero() {
issue.UpdatedAt = now
}
// Defensive fix for closed_at invariant
if issue.Status == types.StatusClosed && issue.ClosedAt == nil {
maxTime := issue.CreatedAt
if issue.UpdatedAt.After(maxTime) {
maxTime = issue.UpdatedAt
}
closedAt := maxTime.Add(time.Second)
issue.ClosedAt = &closedAt
}
// Defensive fix for tombstone invariant
if issue.Status == types.StatusTombstone && issue.DeletedAt == nil {
maxTime := issue.CreatedAt
if issue.UpdatedAt.After(maxTime) {
maxTime = issue.UpdatedAt
}
deletedAt := maxTime.Add(time.Second)
issue.DeletedAt = &deletedAt
}
// Validate issue before creating
if err := issue.ValidateWithCustom(customStatuses, customTypes); err != nil {
return fmt.Errorf("validation failed: %w", err)
}
// Compute content hash
if issue.ContentHash == "" {
issue.ContentHash = issue.ComputeContentHash()
}
// Get configured prefix for validation and ID generation behavior
var configPrefix string
err = t.conn.QueryRowContext(ctx, `SELECT value FROM config WHERE key = ?`, "issue_prefix").Scan(&configPrefix)
if err == sql.ErrNoRows || configPrefix == "" {
return fmt.Errorf("database not initialized: issue_prefix config is missing (run 'bd init --prefix <prefix>' first)")
} else if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}
prefix := configPrefix
if issue.IDPrefix != "" {
prefix = configPrefix + "-" + issue.IDPrefix
}
if issue.ID == "" {
// Import path expects IDs, but be defensive and generate if missing.
generatedID, err := GenerateIssueID(ctx, t.conn, prefix, issue, actor)
if err != nil {
return fmt.Errorf("failed to generate issue ID: %w", err)
}
issue.ID = generatedID
} else if !skipPrefixValidation {
if err := ValidateIssueIDPrefix(issue.ID, prefix); err != nil {
return fmt.Errorf("failed to validate issue ID prefix: %w", err)
}
}
// Ensure parent exists for hierarchical IDs (importer should have ensured / resurrected).
if isHierarchical, parentID := IsHierarchicalID(issue.ID); isHierarchical {
var parentCount int
if err := t.conn.QueryRowContext(ctx, `SELECT COUNT(*) FROM issues WHERE id = ?`, parentID).Scan(&parentCount); err != nil {
return fmt.Errorf("failed to check parent existence: %w", err)
}
if parentCount == 0 {
return fmt.Errorf("parent issue %s does not exist", parentID)
}
}
// Insert issue (strict)
if err := insertIssueStrict(ctx, t.conn, issue); err != nil {
return fmt.Errorf("failed to insert issue: %w", err)
}
// Record event
if err := recordCreatedEvent(ctx, t.conn, issue, actor); err != nil {
return fmt.Errorf("failed to record creation event: %w", err)
}
// Mark dirty
if err := markDirty(ctx, t.conn, issue.ID); err != nil {
return fmt.Errorf("failed to mark issue dirty: %w", err)
}
return nil
}

View File

@@ -824,6 +824,37 @@ func (t *sqliteTxStorage) AddDependency(ctx context.Context, dep *types.Dependen
return nil
}
// GetDependencyRecords retrieves dependency records for an issue within the transaction.
func (t *sqliteTxStorage) GetDependencyRecords(ctx context.Context, issueID string) ([]*types.Dependency, error) {
rows, err := t.conn.QueryContext(ctx, `
SELECT issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id
FROM dependencies
WHERE issue_id = ?
`, issueID)
if err != nil {
return nil, fmt.Errorf("failed to query dependencies: %w", err)
}
defer func() { _ = rows.Close() }()
var deps []*types.Dependency
for rows.Next() {
var d types.Dependency
var metadata sql.NullString
var threadID sql.NullString
if err := rows.Scan(&d.IssueID, &d.DependsOnID, &d.Type, &d.CreatedAt, &d.CreatedBy, &metadata, &threadID); err != nil {
return nil, fmt.Errorf("failed to scan dependency: %w", err)
}
if metadata.Valid {
d.Metadata = metadata.String
}
if threadID.Valid {
d.ThreadID = threadID.String
}
deps = append(deps, &d)
}
return deps, rows.Err()
}
// RemoveDependency removes a dependency within the transaction.
func (t *sqliteTxStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
// First, check what type of dependency is being removed
@@ -916,6 +947,11 @@ func (t *sqliteTxStorage) AddLabel(ctx context.Context, issueID, label, actor st
return nil
}
// GetLabels retrieves labels for an issue within the transaction.
func (t *sqliteTxStorage) GetLabels(ctx context.Context, issueID string) ([]string, error) {
return t.getLabels(ctx, issueID)
}
// RemoveLabel removes a label from an issue within the transaction.
func (t *sqliteTxStorage) RemoveLabel(ctx context.Context, issueID, label, actor string) error {
result, err := t.conn.ExecContext(ctx, `
@@ -1063,6 +1099,68 @@ func (t *sqliteTxStorage) AddComment(ctx context.Context, issueID, actor, commen
return nil
}
// ImportIssueComment adds a structured comment during import, preserving the original timestamp.
func (t *sqliteTxStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
// Verify issue exists
existing, err := t.GetIssue(ctx, issueID)
if err != nil {
return nil, fmt.Errorf("failed to check issue existence: %w", err)
}
if existing == nil {
return nil, fmt.Errorf("issue %s not found", issueID)
}
createdAtStr := createdAt.UTC().Format(time.RFC3339Nano)
res, err := t.conn.ExecContext(ctx, `
INSERT INTO comments (issue_id, author, text, created_at)
VALUES (?, ?, ?, ?)
`, issueID, author, text, createdAtStr)
if err != nil {
return nil, fmt.Errorf("failed to insert comment: %w", err)
}
commentID, err := res.LastInsertId()
if err != nil {
return nil, fmt.Errorf("failed to get comment ID: %w", err)
}
// Mark issue dirty
if err := markDirty(ctx, t.conn, issueID); err != nil {
return nil, fmt.Errorf("failed to mark issue dirty: %w", err)
}
return &types.Comment{
ID: commentID,
IssueID: issueID,
Author: author,
Text: text,
CreatedAt: createdAt.UTC(),
}, nil
}
// GetIssueComments retrieves structured comments for an issue within the transaction.
func (t *sqliteTxStorage) GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error) {
rows, err := t.conn.QueryContext(ctx, `
SELECT id, issue_id, author, text, created_at
FROM comments
WHERE issue_id = ?
ORDER BY created_at ASC
`, issueID)
if err != nil {
return nil, fmt.Errorf("failed to query comments: %w", err)
}
defer func() { _ = rows.Close() }()
var comments []*types.Comment
for rows.Next() {
var c types.Comment
if err := rows.Scan(&c.ID, &c.IssueID, &c.Author, &c.Text, &c.CreatedAt); err != nil {
return nil, fmt.Errorf("failed to scan comment: %w", err)
}
comments = append(comments, &c)
}
return comments, rows.Err()
}
// SearchIssues finds issues matching query and filters within the transaction.
// This enables read-your-writes semantics for searching within a transaction.
func (t *sqliteTxStorage) SearchIssues(ctx context.Context, query string, filter types.IssueFilter) ([]*types.Issue, error) {

View File

@@ -4,6 +4,7 @@ package storage
import (
"context"
"database/sql"
"time"
"github.com/steveyegge/beads/internal/types"
)
@@ -58,10 +59,12 @@ type Transaction interface {
// Dependency operations
AddDependency(ctx context.Context, dep *types.Dependency, actor string) error
RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error
GetDependencyRecords(ctx context.Context, issueID string) ([]*types.Dependency, error)
// Label operations
AddLabel(ctx context.Context, issueID, label, actor string) error
RemoveLabel(ctx context.Context, issueID, label, actor string) error
GetLabels(ctx context.Context, issueID string) ([]string, error)
// Config operations (for atomic config + issue workflows)
SetConfig(ctx context.Context, key, value string) error
@@ -73,6 +76,8 @@ type Transaction interface {
// Comment operations
AddComment(ctx context.Context, issueID, actor, comment string) error
ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error)
GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error)
}
// Storage defines the interface for issue storage backends
@@ -121,6 +126,9 @@ type Storage interface {
// Comments
AddIssueComment(ctx context.Context, issueID, author, text string) (*types.Comment, error)
// ImportIssueComment adds a comment while preserving the original timestamp.
// Used during JSONL import to avoid timestamp drift across sync cycles.
ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error)
GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error)
GetCommentsForIssues(ctx context.Context, issueIDs []string) (map[string][]*types.Comment, error)

View File

@@ -5,6 +5,7 @@ import (
"context"
"database/sql"
"testing"
"time"
"github.com/steveyegge/beads/internal/types"
)
@@ -119,6 +120,9 @@ func (m *mockStorage) GetEvents(ctx context.Context, issueID string, limit int)
func (m *mockStorage) AddIssueComment(ctx context.Context, issueID, author, text string) (*types.Comment, error) {
return nil, nil
}
func (m *mockStorage) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
return nil, nil
}
func (m *mockStorage) GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error) {
return nil, nil
}
@@ -237,12 +241,18 @@ func (m *mockTransaction) AddDependency(ctx context.Context, dep *types.Dependen
func (m *mockTransaction) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
return nil
}
func (m *mockTransaction) GetDependencyRecords(ctx context.Context, issueID string) ([]*types.Dependency, error) {
return nil, nil
}
func (m *mockTransaction) AddLabel(ctx context.Context, issueID, label, actor string) error {
return nil
}
func (m *mockTransaction) RemoveLabel(ctx context.Context, issueID, label, actor string) error {
return nil
}
func (m *mockTransaction) GetLabels(ctx context.Context, issueID string) ([]string, error) {
return nil, nil
}
func (m *mockTransaction) SetConfig(ctx context.Context, key, value string) error {
return nil
}
@@ -258,6 +268,12 @@ func (m *mockTransaction) GetMetadata(ctx context.Context, key string) (string,
func (m *mockTransaction) AddComment(ctx context.Context, issueID, actor, comment string) error {
return nil
}
func (m *mockTransaction) ImportIssueComment(ctx context.Context, issueID, author, text string, createdAt time.Time) (*types.Comment, error) {
return nil, nil
}
func (m *mockTransaction) GetIssueComments(ctx context.Context, issueID string) ([]*types.Comment, error) {
return nil, nil
}
// TestConfig verifies the Config struct has expected fields.
func TestConfig(t *testing.T) {