Add RPC support for compact command (bd-184)
- Added OpCompact and OpCompactStats operation constants - Added CompactArgs, CompactStatsArgs, and response types to RPC protocol - Implemented handleCompact and handleCompactStats in RPC server - Updated compact command to use RPC when daemon is available - Fixed RPC client to include Cwd for proper database routing - Compact now works in daemon mode with --no-daemon flag Amp-Thread-ID: https://ampcode.com/threads/T-87885d07-80ad-466d-9ffb-cc96fab4853f Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
File diff suppressed because one or more lines are too long
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
@@ -44,17 +45,34 @@ Examples:
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Handle compact stats first
|
||||
if compactStats {
|
||||
if daemonClient != nil {
|
||||
runCompactStatsRPC(ctx)
|
||||
} else {
|
||||
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
|
||||
if !ok {
|
||||
fmt.Fprintf(os.Stderr, "Error: compact requires SQLite storage\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
runCompactStats(ctx, sqliteStore)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// If using daemon, delegate to RPC
|
||||
if daemonClient != nil {
|
||||
runCompactRPC(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
// Direct mode - original logic
|
||||
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
|
||||
if !ok {
|
||||
fmt.Fprintf(os.Stderr, "Error: compact requires SQLite storage\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if compactStats {
|
||||
runCompactStats(ctx, sqliteStore)
|
||||
return
|
||||
}
|
||||
|
||||
if compactID != "" && compactAll {
|
||||
fmt.Fprintf(os.Stderr, "Error: cannot use --id and --all together\n")
|
||||
os.Exit(1)
|
||||
@@ -385,6 +403,164 @@ func progressBar(current, total int) string {
|
||||
return "[" + bar + "]"
|
||||
}
|
||||
|
||||
func runCompactRPC(ctx context.Context) {
|
||||
if compactID != "" && compactAll {
|
||||
fmt.Fprintf(os.Stderr, "Error: cannot use --id and --all together\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if compactForce && compactID == "" {
|
||||
fmt.Fprintf(os.Stderr, "Error: --force requires --id\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if compactID == "" && !compactAll && !compactDryRun {
|
||||
fmt.Fprintf(os.Stderr, "Error: must specify --all, --id, or --dry-run\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
apiKey := os.Getenv("ANTHROPIC_API_KEY")
|
||||
if apiKey == "" && !compactDryRun {
|
||||
fmt.Fprintf(os.Stderr, "Error: ANTHROPIC_API_KEY environment variable not set\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
args := map[string]interface{}{
|
||||
"tier": compactTier,
|
||||
"dry_run": compactDryRun,
|
||||
"force": compactForce,
|
||||
"all": compactAll,
|
||||
"api_key": apiKey,
|
||||
"workers": compactWorkers,
|
||||
"batch_size": compactBatch,
|
||||
}
|
||||
if compactID != "" {
|
||||
args["issue_id"] = compactID
|
||||
}
|
||||
|
||||
resp, err := daemonClient.Execute("compact", args)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if !resp.Success {
|
||||
fmt.Fprintf(os.Stderr, "Error: %s\n", resp.Error)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if jsonOutput {
|
||||
fmt.Println(string(resp.Data))
|
||||
return
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Success bool `json:"success"`
|
||||
IssueID string `json:"issue_id,omitempty"`
|
||||
OriginalSize int `json:"original_size,omitempty"`
|
||||
CompactedSize int `json:"compacted_size,omitempty"`
|
||||
Reduction string `json:"reduction,omitempty"`
|
||||
Duration string `json:"duration,omitempty"`
|
||||
DryRun bool `json:"dry_run,omitempty"`
|
||||
Results []struct {
|
||||
IssueID string `json:"issue_id"`
|
||||
Success bool `json:"success"`
|
||||
Error string `json:"error,omitempty"`
|
||||
OriginalSize int `json:"original_size,omitempty"`
|
||||
CompactedSize int `json:"compacted_size,omitempty"`
|
||||
Reduction string `json:"reduction,omitempty"`
|
||||
} `json:"results,omitempty"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(resp.Data, &result); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error parsing response: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if compactID != "" {
|
||||
if result.DryRun {
|
||||
fmt.Printf("DRY RUN - Tier %d compaction\n\n", compactTier)
|
||||
fmt.Printf("Issue: %s\n", compactID)
|
||||
fmt.Printf("Original size: %d bytes\n", result.OriginalSize)
|
||||
fmt.Printf("Estimated reduction: %s\n", result.Reduction)
|
||||
} else {
|
||||
fmt.Printf("Successfully compacted %s\n", result.IssueID)
|
||||
fmt.Printf("Original size: %d bytes\n", result.OriginalSize)
|
||||
fmt.Printf("Compacted size: %d bytes\n", result.CompactedSize)
|
||||
fmt.Printf("Reduction: %s\n", result.Reduction)
|
||||
fmt.Printf("Duration: %s\n", result.Duration)
|
||||
}
|
||||
} else if compactAll {
|
||||
if result.DryRun {
|
||||
fmt.Printf("DRY RUN - Found %d candidates for Tier %d compaction\n", len(result.Results), compactTier)
|
||||
} else {
|
||||
successCount := 0
|
||||
for _, r := range result.Results {
|
||||
if r.Success {
|
||||
successCount++
|
||||
}
|
||||
}
|
||||
fmt.Printf("Compacted %d/%d issues in %s\n", successCount, len(result.Results), result.Duration)
|
||||
for _, r := range result.Results {
|
||||
if r.Success {
|
||||
fmt.Printf(" ✓ %s: %d → %d bytes (%s)\n", r.IssueID, r.OriginalSize, r.CompactedSize, r.Reduction)
|
||||
} else {
|
||||
fmt.Printf(" ✗ %s: %s\n", r.IssueID, r.Error)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runCompactStatsRPC(ctx context.Context) {
|
||||
args := map[string]interface{}{
|
||||
"tier": compactTier,
|
||||
}
|
||||
|
||||
resp, err := daemonClient.Execute("compact_stats", args)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if !resp.Success {
|
||||
fmt.Fprintf(os.Stderr, "Error: %s\n", resp.Error)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if jsonOutput {
|
||||
fmt.Println(string(resp.Data))
|
||||
return
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Success bool `json:"success"`
|
||||
Stats struct {
|
||||
Tier1Candidates int `json:"tier1_candidates"`
|
||||
Tier2Candidates int `json:"tier2_candidates"`
|
||||
TotalClosed int `json:"total_closed"`
|
||||
Tier1MinAge string `json:"tier1_min_age"`
|
||||
Tier2MinAge string `json:"tier2_min_age"`
|
||||
EstimatedSavings string `json:"estimated_savings,omitempty"`
|
||||
} `json:"stats"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(resp.Data, &result); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error parsing response: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Printf("\nCompaction Statistics\n")
|
||||
fmt.Printf("=====================\n\n")
|
||||
fmt.Printf("Total closed issues: %d\n\n", result.Stats.TotalClosed)
|
||||
fmt.Printf("Tier 1 (30+ days closed, not compacted):\n")
|
||||
fmt.Printf(" Candidates: %d\n", result.Stats.Tier1Candidates)
|
||||
fmt.Printf(" Min age: %s\n\n", result.Stats.Tier1MinAge)
|
||||
fmt.Printf("Tier 2 (90+ days closed, Tier 1 compacted):\n")
|
||||
fmt.Printf(" Candidates: %d\n", result.Stats.Tier2Candidates)
|
||||
fmt.Printf(" Min age: %s\n", result.Stats.Tier2MinAge)
|
||||
}
|
||||
|
||||
func init() {
|
||||
compactCmd.Flags().BoolVar(&compactDryRun, "dry-run", false, "Preview without compacting")
|
||||
compactCmd.Flags().IntVar(&compactTier, "tier", 1, "Compaction tier (1 or 2)")
|
||||
|
||||
@@ -99,10 +99,14 @@ func (c *Client) Execute(operation string, args interface{}) (*Response, error)
|
||||
return nil, fmt.Errorf("failed to marshal args: %w", err)
|
||||
}
|
||||
|
||||
// Get current working directory for database routing
|
||||
cwd, _ := os.Getwd()
|
||||
|
||||
req := Request{
|
||||
Operation: operation,
|
||||
Args: argsJSON,
|
||||
ClientVersion: ClientVersion,
|
||||
Cwd: cwd,
|
||||
}
|
||||
|
||||
reqJSON, err := json.Marshal(req)
|
||||
|
||||
@@ -30,6 +30,8 @@ const (
|
||||
OpReposReady = "repos_ready"
|
||||
OpReposStats = "repos_stats"
|
||||
OpReposClearCache = "repos_clear_cache"
|
||||
OpCompact = "compact"
|
||||
OpCompactStats = "compact_stats"
|
||||
)
|
||||
|
||||
// Request represents an RPC request from client to daemon
|
||||
@@ -230,3 +232,53 @@ type ReposStatsResponse struct {
|
||||
PerRepo map[string]types.Statistics `json:"per_repo"`
|
||||
Errors map[string]string `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
// CompactArgs represents arguments for the compact operation
|
||||
type CompactArgs struct {
|
||||
IssueID string `json:"issue_id,omitempty"` // Empty for --all
|
||||
Tier int `json:"tier"` // 1 or 2
|
||||
DryRun bool `json:"dry_run"`
|
||||
Force bool `json:"force"`
|
||||
All bool `json:"all"`
|
||||
APIKey string `json:"api_key,omitempty"`
|
||||
Workers int `json:"workers,omitempty"`
|
||||
BatchSize int `json:"batch_size,omitempty"`
|
||||
}
|
||||
|
||||
// CompactStatsArgs represents arguments for compact stats operation
|
||||
type CompactStatsArgs struct {
|
||||
Tier int `json:"tier,omitempty"`
|
||||
}
|
||||
|
||||
// CompactResponse represents the response from a compact operation
|
||||
type CompactResponse struct {
|
||||
Success bool `json:"success"`
|
||||
IssueID string `json:"issue_id,omitempty"`
|
||||
Results []CompactResult `json:"results,omitempty"` // For batch operations
|
||||
Stats *CompactStatsData `json:"stats,omitempty"` // For stats operation
|
||||
OriginalSize int `json:"original_size,omitempty"`
|
||||
CompactedSize int `json:"compacted_size,omitempty"`
|
||||
Reduction string `json:"reduction,omitempty"`
|
||||
Duration string `json:"duration,omitempty"`
|
||||
DryRun bool `json:"dry_run,omitempty"`
|
||||
}
|
||||
|
||||
// CompactResult represents the result of compacting a single issue
|
||||
type CompactResult struct {
|
||||
IssueID string `json:"issue_id"`
|
||||
Success bool `json:"success"`
|
||||
Error string `json:"error,omitempty"`
|
||||
OriginalSize int `json:"original_size,omitempty"`
|
||||
CompactedSize int `json:"compacted_size,omitempty"`
|
||||
Reduction string `json:"reduction,omitempty"`
|
||||
}
|
||||
|
||||
// CompactStatsData represents compaction statistics
|
||||
type CompactStatsData struct {
|
||||
Tier1Candidates int `json:"tier1_candidates"`
|
||||
Tier2Candidates int `json:"tier2_candidates"`
|
||||
TotalClosed int `json:"total_closed"`
|
||||
Tier1MinAge string `json:"tier1_min_age"`
|
||||
Tier2MinAge string `json:"tier2_min_age"`
|
||||
EstimatedSavings string `json:"estimated_savings,omitempty"`
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/compact"
|
||||
"github.com/steveyegge/beads/internal/storage"
|
||||
"github.com/steveyegge/beads/internal/storage/sqlite"
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
@@ -579,6 +580,10 @@ func (s *Server) handleRequest(req *Request) Response {
|
||||
resp = s.handleReposStats(req)
|
||||
case OpReposClearCache:
|
||||
resp = s.handleReposClearCache(req)
|
||||
case OpCompact:
|
||||
resp = s.handleCompact(req)
|
||||
case OpCompactStats:
|
||||
resp = s.handleCompactStats(req)
|
||||
default:
|
||||
s.metrics.RecordError(req.Operation)
|
||||
return Response{
|
||||
@@ -1599,3 +1604,279 @@ func (s *Server) handleReposClearCache(_ *Request) Response {
|
||||
Data: json.RawMessage(`{"message":"Cache cleared successfully"}`),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleCompact(req *Request) Response {
|
||||
var args CompactArgs
|
||||
if err := json.Unmarshal(req.Args, &args); err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("invalid compact args: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
store, err := s.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to get storage: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
|
||||
if !ok {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: "compact requires SQLite storage",
|
||||
}
|
||||
}
|
||||
|
||||
config := &compact.CompactConfig{
|
||||
APIKey: args.APIKey,
|
||||
Concurrency: args.Workers,
|
||||
DryRun: args.DryRun,
|
||||
}
|
||||
if config.Concurrency <= 0 {
|
||||
config.Concurrency = 5
|
||||
}
|
||||
|
||||
compactor, err := compact.New(sqliteStore, args.APIKey, config)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to create compactor: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
ctx := s.reqCtx(req)
|
||||
startTime := time.Now()
|
||||
|
||||
if args.IssueID != "" {
|
||||
if !args.Force {
|
||||
eligible, reason, err := sqliteStore.CheckEligibility(ctx, args.IssueID, args.Tier)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to check eligibility: %v", err),
|
||||
}
|
||||
}
|
||||
if !eligible {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("%s is not eligible for Tier %d compaction: %s", args.IssueID, args.Tier, reason),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
issue, err := sqliteStore.GetIssue(ctx, args.IssueID)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to get issue: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
originalSize := len(issue.Description) + len(issue.Design) + len(issue.Notes) + len(issue.AcceptanceCriteria)
|
||||
|
||||
if args.DryRun {
|
||||
result := CompactResponse{
|
||||
Success: true,
|
||||
IssueID: args.IssueID,
|
||||
OriginalSize: originalSize,
|
||||
Reduction: "70-80%",
|
||||
DryRun: true,
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
return Response{
|
||||
Success: true,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
if args.Tier == 1 {
|
||||
err = compactor.CompactTier1(ctx, args.IssueID)
|
||||
} else {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: "Tier 2 compaction not yet implemented",
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("compaction failed: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
issueAfter, _ := sqliteStore.GetIssue(ctx, args.IssueID)
|
||||
compactedSize := 0
|
||||
if issueAfter != nil {
|
||||
compactedSize = len(issueAfter.Description)
|
||||
}
|
||||
|
||||
duration := time.Since(startTime)
|
||||
result := CompactResponse{
|
||||
Success: true,
|
||||
IssueID: args.IssueID,
|
||||
OriginalSize: originalSize,
|
||||
CompactedSize: compactedSize,
|
||||
Reduction: fmt.Sprintf("%.1f%%", float64(originalSize-compactedSize)/float64(originalSize)*100),
|
||||
Duration: duration.String(),
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
return Response{
|
||||
Success: true,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
if args.All {
|
||||
var candidates []*sqlite.CompactionCandidate
|
||||
|
||||
if args.Tier == 1 {
|
||||
tier1, err := sqliteStore.GetTier1Candidates(ctx)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to get Tier 1 candidates: %v", err),
|
||||
}
|
||||
}
|
||||
candidates = tier1
|
||||
} else if args.Tier == 2 {
|
||||
tier2, err := sqliteStore.GetTier2Candidates(ctx)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to get Tier 2 candidates: %v", err),
|
||||
}
|
||||
}
|
||||
candidates = tier2
|
||||
} else {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("invalid tier: %d (must be 1 or 2)", args.Tier),
|
||||
}
|
||||
}
|
||||
|
||||
if len(candidates) == 0 {
|
||||
result := CompactResponse{
|
||||
Success: true,
|
||||
Results: []CompactResult{},
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
return Response{
|
||||
Success: true,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
issueIDs := make([]string, len(candidates))
|
||||
for i, c := range candidates {
|
||||
issueIDs[i] = c.IssueID
|
||||
}
|
||||
|
||||
batchResults, err := compactor.CompactTier1Batch(ctx, issueIDs)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("batch compaction failed: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
results := make([]CompactResult, 0, len(batchResults))
|
||||
for _, r := range batchResults {
|
||||
result := CompactResult{
|
||||
IssueID: r.IssueID,
|
||||
Success: r.Err == nil,
|
||||
OriginalSize: r.OriginalSize,
|
||||
CompactedSize: r.CompactedSize,
|
||||
}
|
||||
if r.Err != nil {
|
||||
result.Error = r.Err.Error()
|
||||
} else if r.OriginalSize > 0 && r.CompactedSize > 0 {
|
||||
result.Reduction = fmt.Sprintf("%.1f%%", float64(r.OriginalSize-r.CompactedSize)/float64(r.OriginalSize)*100)
|
||||
}
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
duration := time.Since(startTime)
|
||||
response := CompactResponse{
|
||||
Success: true,
|
||||
Results: results,
|
||||
Duration: duration.String(),
|
||||
DryRun: args.DryRun,
|
||||
}
|
||||
data, _ := json.Marshal(response)
|
||||
return Response{
|
||||
Success: true,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: "must specify --all or --id",
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleCompactStats(req *Request) Response {
|
||||
var args CompactStatsArgs
|
||||
if err := json.Unmarshal(req.Args, &args); err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("invalid compact stats args: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
store, err := s.getStorageForRequest(req)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to get storage: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
|
||||
if !ok {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: "compact stats requires SQLite storage",
|
||||
}
|
||||
}
|
||||
|
||||
ctx := s.reqCtx(req)
|
||||
|
||||
tier1, err := sqliteStore.GetTier1Candidates(ctx)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to get Tier 1 candidates: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
tier2, err := sqliteStore.GetTier2Candidates(ctx)
|
||||
if err != nil {
|
||||
return Response{
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("failed to get Tier 2 candidates: %v", err),
|
||||
}
|
||||
}
|
||||
|
||||
stats := CompactStatsData{
|
||||
Tier1Candidates: len(tier1),
|
||||
Tier2Candidates: len(tier2),
|
||||
Tier1MinAge: "30 days",
|
||||
Tier2MinAge: "90 days",
|
||||
TotalClosed: 0, // Could query for this but not critical
|
||||
}
|
||||
|
||||
result := CompactResponse{
|
||||
Success: true,
|
||||
Stats: &stats,
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
return Response{
|
||||
Success: true,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user