Add remaining multi-repo files from bd-307
This commit is contained in:
@@ -271,6 +271,18 @@ func runDaemonLoop(interval time.Duration, autoCommit, autoPush bool, logPath, p
|
|||||||
defer func() { _ = store.Close() }()
|
defer func() { _ = store.Close() }()
|
||||||
log.log("Database opened: %s", daemonDBPath)
|
log.log("Database opened: %s", daemonDBPath)
|
||||||
|
|
||||||
|
// Hydrate from multi-repo if configured
|
||||||
|
hydrateCtx := context.Background()
|
||||||
|
if results, err := store.HydrateFromMultiRepo(hydrateCtx); err != nil {
|
||||||
|
log.log("Error: multi-repo hydration failed: %v", err)
|
||||||
|
os.Exit(1)
|
||||||
|
} else if results != nil {
|
||||||
|
log.log("Multi-repo hydration complete:")
|
||||||
|
for repo, count := range results {
|
||||||
|
log.log(" %s: %d issues", repo, count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Validate database fingerprint
|
// Validate database fingerprint
|
||||||
if err := validateDatabaseFingerprint(store, &log); err != nil {
|
if err := validateDatabaseFingerprint(store, &log); err != nil {
|
||||||
if os.Getenv("BEADS_IGNORE_REPO_MISMATCH") != "1" {
|
if os.Getenv("BEADS_IGNORE_REPO_MISMATCH") != "1" {
|
||||||
|
|||||||
@@ -596,6 +596,14 @@ func createConfigYaml(beadsDir string, noDbMode bool) error {
|
|||||||
# Debounce interval for auto-flush (can also use BEADS_FLUSH_DEBOUNCE)
|
# Debounce interval for auto-flush (can also use BEADS_FLUSH_DEBOUNCE)
|
||||||
# flush-debounce: "5s"
|
# flush-debounce: "5s"
|
||||||
|
|
||||||
|
# Multi-repo configuration (experimental - bd-307)
|
||||||
|
# Allows hydrating from multiple repositories and routing writes to the correct JSONL
|
||||||
|
# repos:
|
||||||
|
# primary: "." # Primary repo (where this database lives)
|
||||||
|
# additional: # Additional repos to hydrate from (read-only)
|
||||||
|
# - ~/beads-planning # Personal planning repo
|
||||||
|
# - ~/work-planning # Work planning repo
|
||||||
|
|
||||||
# Integration settings (access with 'bd config get/set')
|
# Integration settings (access with 'bd config get/set')
|
||||||
# These are stored in the database, not in this file:
|
# These are stored in the database, not in this file:
|
||||||
# - jira.url
|
# - jira.url
|
||||||
|
|||||||
184
internal/storage/sqlite/multirepo_export.go
Normal file
184
internal/storage/sqlite/multirepo_export.go
Normal file
@@ -0,0 +1,184 @@
|
|||||||
|
// Package sqlite implements multi-repo export for the SQLite storage backend.
|
||||||
|
package sqlite
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/steveyegge/beads/internal/config"
|
||||||
|
"github.com/steveyegge/beads/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ExportToMultiRepo writes issues to their respective JSONL files based on source_repo.
|
||||||
|
// Issues are grouped by source_repo and written atomically to each repository.
|
||||||
|
// Returns a map of repo path -> exported issue count.
|
||||||
|
// Returns nil with no error if not in multi-repo mode (backward compatibility).
|
||||||
|
func (s *SQLiteStorage) ExportToMultiRepo(ctx context.Context) (map[string]int, error) {
|
||||||
|
// Get multi-repo config
|
||||||
|
multiRepo := config.GetMultiRepoConfig()
|
||||||
|
if multiRepo == nil {
|
||||||
|
// Single-repo mode - not an error, just no-op
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all issues
|
||||||
|
allIssues, err := s.SearchIssues(ctx, "", types.IssueFilter{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to query issues: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate dependencies for all issues (avoid N+1)
|
||||||
|
allDeps, err := s.GetAllDependencyRecords(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get dependencies: %w", err)
|
||||||
|
}
|
||||||
|
for _, issue := range allIssues {
|
||||||
|
issue.Dependencies = allDeps[issue.ID]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate labels for all issues
|
||||||
|
for _, issue := range allIssues {
|
||||||
|
labels, err := s.GetLabels(ctx, issue.ID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get labels for %s: %w", issue.ID, err)
|
||||||
|
}
|
||||||
|
issue.Labels = labels
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group issues by source_repo
|
||||||
|
issuesByRepo := make(map[string][]*types.Issue)
|
||||||
|
for _, issue := range allIssues {
|
||||||
|
sourceRepo := issue.SourceRepo
|
||||||
|
if sourceRepo == "" {
|
||||||
|
sourceRepo = "." // Default to primary repo
|
||||||
|
}
|
||||||
|
issuesByRepo[sourceRepo] = append(issuesByRepo[sourceRepo], issue)
|
||||||
|
}
|
||||||
|
|
||||||
|
results := make(map[string]int)
|
||||||
|
|
||||||
|
// Export primary repo
|
||||||
|
if issues, ok := issuesByRepo["."]; ok {
|
||||||
|
repoPath := multiRepo.Primary
|
||||||
|
if repoPath == "" {
|
||||||
|
repoPath = "."
|
||||||
|
}
|
||||||
|
count, err := s.exportToRepo(ctx, repoPath, issues)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to export primary repo: %w", err)
|
||||||
|
}
|
||||||
|
results["."] = count
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export additional repos
|
||||||
|
for _, repoPath := range multiRepo.Additional {
|
||||||
|
issues := issuesByRepo[repoPath]
|
||||||
|
if len(issues) == 0 {
|
||||||
|
// No issues for this repo - write empty JSONL to keep in sync
|
||||||
|
count, err := s.exportToRepo(ctx, repoPath, []*types.Issue{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to export repo %s: %w", repoPath, err)
|
||||||
|
}
|
||||||
|
results[repoPath] = count
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
count, err := s.exportToRepo(ctx, repoPath, issues)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to export repo %s: %w", repoPath, err)
|
||||||
|
}
|
||||||
|
results[repoPath] = count
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// exportToRepo writes issues to a single repository's JSONL file atomically.
|
||||||
|
func (s *SQLiteStorage) exportToRepo(ctx context.Context, repoPath string, issues []*types.Issue) (int, error) {
|
||||||
|
// Expand tilde in path
|
||||||
|
expandedPath, err := expandTilde(repoPath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to expand path: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get absolute path
|
||||||
|
absRepoPath, err := filepath.Abs(expandedPath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to get absolute path: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct JSONL path
|
||||||
|
jsonlPath := filepath.Join(absRepoPath, ".beads", "issues.jsonl")
|
||||||
|
|
||||||
|
// Ensure .beads directory exists
|
||||||
|
beadsDir := filepath.Dir(jsonlPath)
|
||||||
|
if err := os.MkdirAll(beadsDir, 0755); err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to create .beads directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort issues by ID for consistent output
|
||||||
|
sort.Slice(issues, func(i, j int) bool {
|
||||||
|
return issues[i].ID < issues[j].ID
|
||||||
|
})
|
||||||
|
|
||||||
|
// Write atomically using temp file + rename
|
||||||
|
tempPath := fmt.Sprintf("%s.tmp.%d", jsonlPath, os.Getpid())
|
||||||
|
f, err := os.Create(tempPath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to create temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure cleanup on failure
|
||||||
|
defer func() {
|
||||||
|
if f != nil {
|
||||||
|
_ = f.Close()
|
||||||
|
_ = os.Remove(tempPath)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Write JSONL
|
||||||
|
encoder := json.NewEncoder(f)
|
||||||
|
for _, issue := range issues {
|
||||||
|
if err := encoder.Encode(issue); err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to encode issue %s: %w", issue.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close before rename
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
return 0, fmt.Errorf("failed to close temp file: %w", err)
|
||||||
|
}
|
||||||
|
f = nil // Prevent defer cleanup
|
||||||
|
|
||||||
|
// Atomic rename
|
||||||
|
if err := os.Rename(tempPath, jsonlPath); err != nil {
|
||||||
|
_ = os.Remove(tempPath)
|
||||||
|
return 0, fmt.Errorf("failed to rename temp file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set file permissions
|
||||||
|
if err := os.Chmod(jsonlPath, 0644); err != nil {
|
||||||
|
// Non-fatal
|
||||||
|
if os.Getenv("BD_DEBUG") != "" {
|
||||||
|
fmt.Fprintf(os.Stderr, "Debug: failed to set permissions on %s: %v\n", jsonlPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update mtime cache for this repo
|
||||||
|
fileInfo, err := os.Stat(jsonlPath)
|
||||||
|
if err == nil {
|
||||||
|
_, err = s.db.ExecContext(ctx, `
|
||||||
|
INSERT OR REPLACE INTO repo_mtimes (repo_path, jsonl_path, mtime_ns, last_checked)
|
||||||
|
VALUES (?, ?, ?, datetime('now'))
|
||||||
|
`, absRepoPath, jsonlPath, fileInfo.ModTime().UnixNano())
|
||||||
|
if err != nil && os.Getenv("BD_DEBUG") != "" {
|
||||||
|
fmt.Fprintf(os.Stderr, "Debug: failed to update mtime cache for %s: %v\n", absRepoPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return len(issues), nil
|
||||||
|
}
|
||||||
@@ -373,3 +373,139 @@ func TestImportJSONLFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExportToMultiRepo(t *testing.T) {
|
||||||
|
t.Run("returns nil in single-repo mode", func(t *testing.T) {
|
||||||
|
store, cleanup := setupTestDB(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Initialize config fresh
|
||||||
|
if err := config.Initialize(); err != nil {
|
||||||
|
t.Fatalf("failed to initialize config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear any multi-repo config from previous tests
|
||||||
|
config.Set("repos.primary", "")
|
||||||
|
config.Set("repos.additional", nil)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
results, err := store.ExportToMultiRepo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error in single-repo mode: %v", err)
|
||||||
|
}
|
||||||
|
if results != nil {
|
||||||
|
t.Errorf("expected nil results in single-repo mode, got %v", results)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("exports issues to correct repos", func(t *testing.T) {
|
||||||
|
store, cleanup := setupTestDB(t)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
// Initialize config
|
||||||
|
if err := config.Initialize(); err != nil {
|
||||||
|
t.Fatalf("failed to initialize config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create temporary repos
|
||||||
|
primaryDir := t.TempDir()
|
||||||
|
additionalDir := t.TempDir()
|
||||||
|
|
||||||
|
// Create .beads directories
|
||||||
|
primaryBeadsDir := filepath.Join(primaryDir, ".beads")
|
||||||
|
additionalBeadsDir := filepath.Join(additionalDir, ".beads")
|
||||||
|
if err := os.MkdirAll(primaryBeadsDir, 0755); err != nil {
|
||||||
|
t.Fatalf("failed to create primary .beads dir: %v", err)
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(additionalBeadsDir, 0755); err != nil {
|
||||||
|
t.Fatalf("failed to create additional .beads dir: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set multi-repo config
|
||||||
|
config.Set("repos.primary", primaryDir)
|
||||||
|
config.Set("repos.additional", []string{additionalDir})
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Create issues with different source_repos
|
||||||
|
issue1 := &types.Issue{
|
||||||
|
ID: "bd-primary-1",
|
||||||
|
Title: "Primary Issue",
|
||||||
|
Status: types.StatusOpen,
|
||||||
|
Priority: 1,
|
||||||
|
IssueType: types.TypeTask,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
UpdatedAt: time.Now(),
|
||||||
|
SourceRepo: ".",
|
||||||
|
}
|
||||||
|
issue1.ContentHash = issue1.ComputeContentHash()
|
||||||
|
|
||||||
|
issue2 := &types.Issue{
|
||||||
|
ID: "bd-additional-1",
|
||||||
|
Title: "Additional Issue",
|
||||||
|
Status: types.StatusOpen,
|
||||||
|
Priority: 1,
|
||||||
|
IssueType: types.TypeTask,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
UpdatedAt: time.Now(),
|
||||||
|
SourceRepo: additionalDir,
|
||||||
|
}
|
||||||
|
issue2.ContentHash = issue2.ComputeContentHash()
|
||||||
|
|
||||||
|
// Insert issues
|
||||||
|
if err := store.CreateIssue(ctx, issue1, "test"); err != nil {
|
||||||
|
t.Fatalf("failed to create primary issue: %v", err)
|
||||||
|
}
|
||||||
|
if err := store.CreateIssue(ctx, issue2, "test"); err != nil {
|
||||||
|
t.Fatalf("failed to create additional issue: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export to multi-repo
|
||||||
|
results, err := store.ExportToMultiRepo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ExportToMultiRepo() error = %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify export counts
|
||||||
|
if results["."] != 1 {
|
||||||
|
t.Errorf("expected 1 issue exported to primary, got %d", results["."])
|
||||||
|
}
|
||||||
|
if results[additionalDir] != 1 {
|
||||||
|
t.Errorf("expected 1 issue exported to additional, got %d", results[additionalDir])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify JSONL files exist and contain correct issues
|
||||||
|
primaryJSONL := filepath.Join(primaryBeadsDir, "issues.jsonl")
|
||||||
|
additionalJSONL := filepath.Join(additionalBeadsDir, "issues.jsonl")
|
||||||
|
|
||||||
|
// Check primary JSONL
|
||||||
|
f1, err := os.Open(primaryJSONL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to open primary JSONL: %v", err)
|
||||||
|
}
|
||||||
|
defer f1.Close()
|
||||||
|
|
||||||
|
var primaryIssue types.Issue
|
||||||
|
if err := json.NewDecoder(f1).Decode(&primaryIssue); err != nil {
|
||||||
|
t.Fatalf("failed to decode primary issue: %v", err)
|
||||||
|
}
|
||||||
|
if primaryIssue.ID != "bd-primary-1" {
|
||||||
|
t.Errorf("expected bd-primary-1 in primary JSONL, got %s", primaryIssue.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check additional JSONL
|
||||||
|
f2, err := os.Open(additionalJSONL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to open additional JSONL: %v", err)
|
||||||
|
}
|
||||||
|
defer f2.Close()
|
||||||
|
|
||||||
|
var additionalIssue types.Issue
|
||||||
|
if err := json.NewDecoder(f2).Decode(&additionalIssue); err != nil {
|
||||||
|
t.Fatalf("failed to decode additional issue: %v", err)
|
||||||
|
}
|
||||||
|
if additionalIssue.ID != "bd-additional-1" {
|
||||||
|
t.Errorf("expected bd-additional-1 in additional JSONL, got %s", additionalIssue.ID)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user