Fix bd-2: Add daemon RPC support for sync export operations
- Added OpExport/OpImport to RPC protocol - Implemented handleExport() in daemon to export via RPC - Modified sync command to use daemon export when available - Prevents nil pointer dereference when daemon is running - Falls back to direct mode if daemon unavailable
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/steveyegge/beads/internal/rpc"
|
||||||
"github.com/steveyegge/beads/internal/types"
|
"github.com/steveyegge/beads/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -241,6 +242,22 @@ func gitPush(ctx context.Context) error {
|
|||||||
|
|
||||||
// exportToJSONL exports the database to JSONL format
|
// exportToJSONL exports the database to JSONL format
|
||||||
func exportToJSONL(ctx context.Context, jsonlPath string) error {
|
func exportToJSONL(ctx context.Context, jsonlPath string) error {
|
||||||
|
// If daemon is running, use RPC
|
||||||
|
if daemonClient != nil {
|
||||||
|
exportArgs := &rpc.ExportArgs{
|
||||||
|
JSONLPath: jsonlPath,
|
||||||
|
}
|
||||||
|
resp, err := daemonClient.Export(exportArgs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("daemon export failed: %w", err)
|
||||||
|
}
|
||||||
|
if !resp.Success {
|
||||||
|
return fmt.Errorf("daemon export error: %s", resp.Error)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Direct mode: access store directly
|
||||||
// Get all issues
|
// Get all issues
|
||||||
issues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
|
issues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -297,3 +297,13 @@ func (c *Client) ReposStats() (*Response, error) {
|
|||||||
func (c *Client) ReposClearCache() (*Response, error) {
|
func (c *Client) ReposClearCache() (*Response, error) {
|
||||||
return c.Execute(OpReposClearCache, struct{}{})
|
return c.Execute(OpReposClearCache, struct{}{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Export exports the database to JSONL format
|
||||||
|
func (c *Client) Export(args *ExportArgs) (*Response, error) {
|
||||||
|
return c.Execute(OpExport, args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Import imports issues from JSONL format
|
||||||
|
func (c *Client) Import(args *ImportArgs) (*Response, error) {
|
||||||
|
return c.Execute(OpImport, args)
|
||||||
|
}
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ const (
|
|||||||
OpReposClearCache = "repos_clear_cache"
|
OpReposClearCache = "repos_clear_cache"
|
||||||
OpCompact = "compact"
|
OpCompact = "compact"
|
||||||
OpCompactStats = "compact_stats"
|
OpCompactStats = "compact_stats"
|
||||||
|
OpExport = "export"
|
||||||
|
OpImport = "import"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Request represents an RPC request from client to daemon
|
// Request represents an RPC request from client to daemon
|
||||||
@@ -284,3 +286,13 @@ type CompactStatsData struct {
|
|||||||
Tier2MinAge string `json:"tier2_min_age"`
|
Tier2MinAge string `json:"tier2_min_age"`
|
||||||
EstimatedSavings string `json:"estimated_savings,omitempty"`
|
EstimatedSavings string `json:"estimated_savings,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExportArgs represents arguments for the export operation
|
||||||
|
type ExportArgs struct {
|
||||||
|
JSONLPath string `json:"jsonl_path"` // Path to export JSONL file
|
||||||
|
}
|
||||||
|
|
||||||
|
// ImportArgs represents arguments for the import operation
|
||||||
|
type ImportArgs struct {
|
||||||
|
JSONLPath string `json:"jsonl_path"` // Path to import JSONL file
|
||||||
|
}
|
||||||
|
|||||||
@@ -644,6 +644,10 @@ func (s *Server) handleRequest(req *Request) Response {
|
|||||||
resp = s.handleCompact(req)
|
resp = s.handleCompact(req)
|
||||||
case OpCompactStats:
|
case OpCompactStats:
|
||||||
resp = s.handleCompactStats(req)
|
resp = s.handleCompactStats(req)
|
||||||
|
case OpExport:
|
||||||
|
resp = s.handleExport(req)
|
||||||
|
case OpImport:
|
||||||
|
resp = s.handleImport(req)
|
||||||
default:
|
default:
|
||||||
s.metrics.RecordError(req.Operation)
|
s.metrics.RecordError(req.Operation)
|
||||||
return Response{
|
return Response{
|
||||||
@@ -2053,3 +2057,155 @@ func (s *Server) handleCompactStats(req *Request) Response {
|
|||||||
Data: data,
|
Data: data,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleExport handles the export operation
|
||||||
|
func (s *Server) handleExport(req *Request) Response {
|
||||||
|
var exportArgs ExportArgs
|
||||||
|
if err := json.Unmarshal(req.Args, &exportArgs); err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("invalid export args: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
store, err := s.getStorageForRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to get storage: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := s.reqCtx(req)
|
||||||
|
|
||||||
|
// Get all issues
|
||||||
|
issues, err := store.SearchIssues(ctx, "", types.IssueFilter{})
|
||||||
|
if err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to get issues: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by ID for consistent output
|
||||||
|
sort.Slice(issues, func(i, j int) bool {
|
||||||
|
return issues[i].ID < issues[j].ID
|
||||||
|
})
|
||||||
|
|
||||||
|
// Populate dependencies for all issues (avoid N+1)
|
||||||
|
allDeps, err := store.GetAllDependencyRecords(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to get dependencies: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, issue := range issues {
|
||||||
|
issue.Dependencies = allDeps[issue.ID]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate labels for all issues
|
||||||
|
for _, issue := range issues {
|
||||||
|
labels, err := store.GetLabels(ctx, issue.ID)
|
||||||
|
if err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to get labels for %s: %v", issue.ID, err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
issue.Labels = labels
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate comments for all issues
|
||||||
|
for _, issue := range issues {
|
||||||
|
comments, err := store.GetIssueComments(ctx, issue.ID)
|
||||||
|
if err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to get comments for %s: %v", issue.ID, err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
issue.Comments = comments
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create temp file for atomic write
|
||||||
|
dir := filepath.Dir(exportArgs.JSONLPath)
|
||||||
|
base := filepath.Base(exportArgs.JSONLPath)
|
||||||
|
tempFile, err := os.CreateTemp(dir, base+".tmp.*")
|
||||||
|
if err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to create temp file: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tempPath := tempFile.Name()
|
||||||
|
defer func() {
|
||||||
|
tempFile.Close()
|
||||||
|
os.Remove(tempPath)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Write JSONL
|
||||||
|
encoder := json.NewEncoder(tempFile)
|
||||||
|
exportedIDs := make([]string, 0, len(issues))
|
||||||
|
for _, issue := range issues {
|
||||||
|
if err := encoder.Encode(issue); err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to encode issue %s: %v", issue.ID, err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
exportedIDs = append(exportedIDs, issue.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close temp file before rename
|
||||||
|
tempFile.Close()
|
||||||
|
|
||||||
|
// Atomic replace
|
||||||
|
if err := os.Rename(tempPath, exportArgs.JSONLPath); err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("failed to replace JSONL file: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set appropriate file permissions (0644: rw-r--r--)
|
||||||
|
if err := os.Chmod(exportArgs.JSONLPath, 0644); err != nil {
|
||||||
|
// Non-fatal, just log
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to set file permissions: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear dirty flags for exported issues
|
||||||
|
if err := store.ClearDirtyIssuesByID(ctx, exportedIDs); err != nil {
|
||||||
|
// Non-fatal, just log
|
||||||
|
fmt.Fprintf(os.Stderr, "Warning: failed to clear dirty flags: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := map[string]interface{}{
|
||||||
|
"exported_count": len(exportedIDs),
|
||||||
|
"path": exportArgs.JSONLPath,
|
||||||
|
}
|
||||||
|
data, _ := json.Marshal(result)
|
||||||
|
return Response{
|
||||||
|
Success: true,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleImport handles the import operation
|
||||||
|
func (s *Server) handleImport(req *Request) Response {
|
||||||
|
var importArgs ImportArgs
|
||||||
|
if err := json.Unmarshal(req.Args, &importArgs); err != nil {
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: fmt.Sprintf("invalid import args: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: The actual import logic is complex and lives in cmd/bd/import.go
|
||||||
|
// For now, we'll return an error suggesting to use direct mode
|
||||||
|
// In the future, we can refactor the import logic into a shared package
|
||||||
|
return Response{
|
||||||
|
Success: false,
|
||||||
|
Error: "import via daemon not yet implemented, use --no-daemon flag",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user