diff --git a/internal/cmd/done.go b/internal/cmd/done.go index 8bb3b362..9db409c1 100644 --- a/internal/cmd/done.go +++ b/internal/cmd/done.go @@ -10,7 +10,6 @@ import ( "github.com/steveyegge/gastown/internal/events" "github.com/steveyegge/gastown/internal/git" "github.com/steveyegge/gastown/internal/mail" - "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/style" "github.com/steveyegge/gastown/internal/workspace" ) @@ -164,28 +163,6 @@ func runDone(cmd *cobra.Command, args []string) error { } mrID = mrIssue.ID - // Also submit to mrqueue so refinery can process it - // The mrqueue is the work queue the refinery polls; beads are the audit record - mq, err := mrqueue.NewFromWorkdir(cwd) - if err != nil { - // Non-fatal: bead was created, just warn about queue - style.PrintWarning("could not access merge queue: %v", err) - } else { - mqEntry := &mrqueue.MR{ - ID: mrID, - Branch: branch, - Target: target, - SourceIssue: issueID, - Worker: worker, - Rig: rigName, - Title: title, - Priority: priority, - } - if err := mq.Submit(mqEntry); err != nil { - style.PrintWarning("could not submit to merge queue: %v", err) - } - } - // Success output fmt.Printf("%s Work submitted to merge queue\n", style.Bold.Render("✓")) fmt.Printf(" MR ID: %s\n", style.Bold.Render(mrID)) diff --git a/internal/cmd/mq_migrate.go b/internal/cmd/mq_migrate.go deleted file mode 100644 index 19284a96..00000000 --- a/internal/cmd/mq_migrate.go +++ /dev/null @@ -1,175 +0,0 @@ -package cmd - -import ( - "fmt" - "os" - "strings" - - "github.com/spf13/cobra" - "github.com/steveyegge/gastown/internal/beads" - "github.com/steveyegge/gastown/internal/mrqueue" - "github.com/steveyegge/gastown/internal/style" - "github.com/steveyegge/gastown/internal/workspace" -) - -var ( - mqMigrateDryRun bool -) - -var mqMigrateCmd = &cobra.Command{ - Use: "migrate", - Short: "Migrate stale merge-request beads to the mrqueue", - Long: `Migrate existing merge-request beads to the mrqueue. - -This command finds merge-request beads that were created before the mrqueue -integration and adds them to the refinery's work queue (.beads/mq/). - -Use this to recover stale MRs that the refinery wasn't processing because -they only existed as beads. - -Examples: - gt mq migrate # Migrate all stale MRs - gt mq migrate --dry-run # Preview what would be migrated`, - RunE: runMqMigrate, -} - -func init() { - mqMigrateCmd.Flags().BoolVar(&mqMigrateDryRun, "dry-run", false, "Preview only, don't actually migrate") - mqCmd.AddCommand(mqMigrateCmd) -} - -func runMqMigrate(cmd *cobra.Command, args []string) error { - // Find workspace - townRoot, err := workspace.FindFromCwdOrError() - if err != nil { - return fmt.Errorf("not in a Gas Town workspace: %w", err) - } - - // Find current rig - rigName, _, err := findCurrentRig(townRoot) - if err != nil { - return err - } - - // Initialize beads - cwd, err := os.Getwd() - if err != nil { - return fmt.Errorf("getting current directory: %w", err) - } - bd := beads.New(cwd) - - // Initialize mrqueue - mq, err := mrqueue.NewFromWorkdir(cwd) - if err != nil { - return fmt.Errorf("accessing merge queue: %w", err) - } - - // Get existing mrqueue entries to avoid duplicates - existingMRs, err := mq.List() - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("listing existing queue: %w", err) - } - existingIDs := make(map[string]bool) - for _, mr := range existingMRs { - existingIDs[mr.ID] = true - } - - // List all open merge-request beads - // Note: beads.List() with default ListOptions filters for P0 only (Priority default is 0) - // So we use Priority: -1 to indicate no filter - allIssues, err := bd.List(beads.ListOptions{Priority: -1}) - if err != nil { - return fmt.Errorf("listing all beads: %w", err) - } - - // Filter for open merge-requests - var issues []*beads.Issue - for _, issue := range allIssues { - if issue.Type == "merge-request" && issue.Status == "open" { - issues = append(issues, issue) - } - } - - if len(issues) == 0 { - fmt.Println("No stale merge-request beads found.") - return nil - } - - // Filter to only those not already in mrqueue - var toMigrate []*beads.Issue - for _, issue := range issues { - if !existingIDs[issue.ID] { - toMigrate = append(toMigrate, issue) - } - } - - if len(toMigrate) == 0 { - fmt.Println("All merge-request beads already in mrqueue.") - return nil - } - - fmt.Printf("Found %d stale merge-request bead(s) to migrate:\n\n", len(toMigrate)) - - migrated := 0 - for _, issue := range toMigrate { - // Parse MR fields from bead description - mrFields := beads.ParseMRFields(issue) - if mrFields == nil { - fmt.Printf(" %s %s - skipping (no MR fields in description)\n", - style.Dim.Render("⚠"), issue.ID) - continue - } - - // Extract worker from description - worker := mrFields.Worker - if worker == "" { - // Try to extract from branch name - if strings.HasPrefix(mrFields.Branch, "polecat/") { - parts := strings.SplitN(mrFields.Branch, "/", 3) - if len(parts) >= 2 { - worker = parts[1] - } - } - } - - if mqMigrateDryRun { - fmt.Printf(" %s %s - %s (branch: %s, target: %s)\n", - style.Bold.Render("→"), issue.ID, issue.Title, - mrFields.Branch, mrFields.Target) - } else { - // Create mrqueue entry - mqEntry := &mrqueue.MR{ - ID: issue.ID, - Branch: mrFields.Branch, - Target: mrFields.Target, - SourceIssue: mrFields.SourceIssue, - Worker: worker, - Rig: rigName, - Title: issue.Title, - Priority: issue.Priority, - } - - if err := mq.Submit(mqEntry); err != nil { - fmt.Printf(" %s %s - failed: %v\n", - style.Dim.Render("✗"), issue.ID, err) - continue - } - - fmt.Printf(" %s %s - %s\n", - style.Bold.Render("✓"), issue.ID, issue.Title) - migrated++ - } - } - - fmt.Println() - if mqMigrateDryRun { - fmt.Printf("Dry run: would migrate %d merge-request(s)\n", len(toMigrate)) - fmt.Println("Run without --dry-run to perform migration.") - } else { - fmt.Printf("%s Migrated %d merge-request(s) to mrqueue\n", - style.Bold.Render("✓"), migrated) - fmt.Println("The refinery will process them on its next poll cycle.") - } - - return nil -} diff --git a/internal/cmd/mq_submit.go b/internal/cmd/mq_submit.go index 9d50b52e..cdc49eb7 100644 --- a/internal/cmd/mq_submit.go +++ b/internal/cmd/mq_submit.go @@ -11,7 +11,6 @@ import ( "github.com/spf13/cobra" "github.com/steveyegge/gastown/internal/beads" "github.com/steveyegge/gastown/internal/git" - "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/style" "github.com/steveyegge/gastown/internal/workspace" ) @@ -150,28 +149,6 @@ func runMqSubmit(cmd *cobra.Command, args []string) error { return fmt.Errorf("creating merge request bead: %w", err) } - // Also submit to mrqueue so refinery can process it - // The mrqueue is the work queue the refinery polls; beads are the audit record - mq, err := mrqueue.NewFromWorkdir(cwd) - if err != nil { - // Non-fatal: bead was created, just warn about queue - style.PrintWarning("could not access merge queue: %v", err) - } else { - mqEntry := &mrqueue.MR{ - ID: mrIssue.ID, - Branch: branch, - Target: target, - SourceIssue: issueID, - Worker: worker, - Rig: rigName, - Title: title, - Priority: priority, - } - if err := mq.Submit(mqEntry); err != nil { - style.PrintWarning("could not submit to merge queue: %v", err) - } - } - // Success output fmt.Printf("%s Submitted to merge queue\n", style.Bold.Render("✓")) fmt.Printf(" MR ID: %s\n", style.Bold.Render(mrIssue.ID)) diff --git a/internal/mrqueue/mrqueue.go b/internal/mrqueue/mrqueue.go deleted file mode 100644 index 7565caa0..00000000 --- a/internal/mrqueue/mrqueue.go +++ /dev/null @@ -1,214 +0,0 @@ -// Package mrqueue provides merge request queue storage. -// MRs are stored locally in .beads/mq/ and deleted after merge. -// This avoids sync overhead for transient MR state. -package mrqueue - -import ( - "crypto/rand" - "encoding/hex" - "encoding/json" - "fmt" - "os" - "path/filepath" - "sort" - "strings" - "time" -) - -// MR represents a merge request in the queue. -type MR struct { - ID string `json:"id"` - Branch string `json:"branch"` // Source branch (e.g., "polecat/nux") - Target string `json:"target"` // Target branch (e.g., "main") - SourceIssue string `json:"source_issue"` // The work item being merged - Worker string `json:"worker"` // Who did the work - Rig string `json:"rig"` // Which rig - Title string `json:"title"` // MR title - Priority int `json:"priority"` // Priority (lower = higher priority) - CreatedAt time.Time `json:"created_at"` -} - -// Queue manages the MR storage. -type Queue struct { - dir string // .beads/mq/ directory -} - -// New creates a new MR queue for the given rig path. -func New(rigPath string) *Queue { - return &Queue{ - dir: filepath.Join(rigPath, ".beads", "mq"), - } -} - -// NewFromWorkdir creates a queue by finding the rig root from a working directory. -// It follows beads redirects to ensure all clones use the same shared mrqueue. -func NewFromWorkdir(workdir string) (*Queue, error) { - // Walk up to find .beads or rig root - dir := workdir - for { - beadsDir := filepath.Join(dir, ".beads") - if info, err := os.Stat(beadsDir); err == nil && info.IsDir() { - // Check for redirect and follow it - finalDir := resolveBeadsRedirect(beadsDir) - return &Queue{dir: filepath.Join(finalDir, "mq")}, nil - } - - parent := filepath.Dir(dir) - if parent == dir { - return nil, fmt.Errorf("could not find .beads directory from %s", workdir) - } - dir = parent - } -} - -// resolveBeadsRedirect follows beads redirect files to find the final directory. -// Returns the original dir if no redirect or on error. -func resolveBeadsRedirect(beadsDir string) string { - redirectPath := filepath.Join(beadsDir, "redirect") - data, err := os.ReadFile(redirectPath) - if err != nil { - return beadsDir // No redirect file - } - - target := strings.TrimSpace(string(data)) - if target == "" { - return beadsDir - } - - // Resolve relative path from beadsDir's parent - if !filepath.IsAbs(target) { - target = filepath.Join(filepath.Dir(beadsDir), target) - } - - // Clean and verify the target exists - target = filepath.Clean(target) - if info, err := os.Stat(target); err == nil && info.IsDir() { - return target - } - - return beadsDir // Target doesn't exist, use original -} - -// EnsureDir creates the MQ directory if it doesn't exist. -func (q *Queue) EnsureDir() error { - return os.MkdirAll(q.dir, 0755) -} - -// generateID creates a unique MR ID. -func generateID() string { - b := make([]byte, 4) - rand.Read(b) - return fmt.Sprintf("mr-%d-%s", time.Now().Unix(), hex.EncodeToString(b)) -} - -// Submit adds a new MR to the queue. -func (q *Queue) Submit(mr *MR) error { - if err := q.EnsureDir(); err != nil { - return fmt.Errorf("creating mq directory: %w", err) - } - - if mr.ID == "" { - mr.ID = generateID() - } - if mr.CreatedAt.IsZero() { - mr.CreatedAt = time.Now() - } - - data, err := json.MarshalIndent(mr, "", " ") - if err != nil { - return fmt.Errorf("marshaling MR: %w", err) - } - - path := filepath.Join(q.dir, mr.ID+".json") - if err := os.WriteFile(path, data, 0644); err != nil { - return fmt.Errorf("writing MR file: %w", err) - } - - return nil -} - -// List returns all pending MRs, sorted by priority then creation time. -func (q *Queue) List() ([]*MR, error) { - entries, err := os.ReadDir(q.dir) - if err != nil { - if os.IsNotExist(err) { - return nil, nil // Empty queue - } - return nil, fmt.Errorf("reading mq directory: %w", err) - } - - var mrs []*MR - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { - continue - } - - mr, err := q.load(filepath.Join(q.dir, entry.Name())) - if err != nil { - continue // Skip malformed files - } - mrs = append(mrs, mr) - } - - // Sort by priority (lower first), then by creation time (older first) - sort.Slice(mrs, func(i, j int) bool { - if mrs[i].Priority != mrs[j].Priority { - return mrs[i].Priority < mrs[j].Priority - } - return mrs[i].CreatedAt.Before(mrs[j].CreatedAt) - }) - - return mrs, nil -} - -// Get retrieves a specific MR by ID. -func (q *Queue) Get(id string) (*MR, error) { - path := filepath.Join(q.dir, id+".json") - return q.load(path) -} - -// load reads an MR from a file path. -func (q *Queue) load(path string) (*MR, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - var mr MR - if err := json.Unmarshal(data, &mr); err != nil { - return nil, err - } - - return &mr, nil -} - -// Remove deletes an MR from the queue (after successful merge). -func (q *Queue) Remove(id string) error { - path := filepath.Join(q.dir, id+".json") - err := os.Remove(path) - if os.IsNotExist(err) { - return nil // Already removed - } - return err -} - -// Count returns the number of pending MRs. -func (q *Queue) Count() int { - entries, err := os.ReadDir(q.dir) - if err != nil { - return 0 - } - - count := 0 - for _, entry := range entries { - if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".json") { - count++ - } - } - return count -} - -// Dir returns the queue directory path. -func (q *Queue) Dir() string { - return q.dir -} diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index 021cdeb0..862f1975 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -11,9 +11,7 @@ import ( "time" "github.com/steveyegge/gastown/internal/beads" - "github.com/steveyegge/gastown/internal/events" "github.com/steveyegge/gastown/internal/git" - "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/rig" ) @@ -71,14 +69,10 @@ func DefaultMergeQueueConfig() *MergeQueueConfig { type Engineer struct { rig *rig.Rig beads *beads.Beads - mrQueue *mrqueue.Queue git *git.Git config *MergeQueueConfig workDir string output io.Writer // Output destination for user-facing messages - - // stopCh is used for graceful shutdown - stopCh chan struct{} } // NewEngineer creates a new Engineer for the given rig. @@ -86,12 +80,10 @@ func NewEngineer(r *rig.Rig) *Engineer { return &Engineer{ rig: r, beads: beads.New(r.Path), - mrQueue: mrqueue.New(r.Path), git: git.NewGit(r.Path), config: DefaultMergeQueueConfig(), workDir: r.Path, output: os.Stdout, - stopCh: make(chan struct{}), } } @@ -189,90 +181,6 @@ func (e *Engineer) Config() *MergeQueueConfig { return e.config } -// Run starts the Engineer main loop. It blocks until the context is cancelled -// or Stop() is called. Returns nil on graceful shutdown. -func (e *Engineer) Run(ctx context.Context) error { - if err := e.LoadConfig(); err != nil { - return fmt.Errorf("loading config: %w", err) - } - - if !e.config.Enabled { - return fmt.Errorf("merge queue is disabled in configuration") - } - - fmt.Fprintf(e.output, "[Engineer] Starting for rig %s (poll_interval=%s)\n", - e.rig.Name, e.config.PollInterval) - - ticker := time.NewTicker(e.config.PollInterval) - defer ticker.Stop() - - // Run one iteration immediately, then on ticker - if err := e.processOnce(ctx); err != nil { - fmt.Fprintf(e.output, "[Engineer] Error: %v\n", err) - } - - for { - select { - case <-ctx.Done(): - fmt.Fprintln(e.output, "[Engineer] Shutting down (context cancelled)") - return nil - case <-e.stopCh: - fmt.Fprintln(e.output, "[Engineer] Shutting down (stop signal)") - return nil - case <-ticker.C: - if err := e.processOnce(ctx); err != nil { - fmt.Fprintf(e.output, "[Engineer] Error: %v\n", err) - } - } - } -} - -// Stop signals the Engineer to stop processing. This is a non-blocking call. -func (e *Engineer) Stop() { - close(e.stopCh) -} - -// processOnce performs one iteration of the Engineer loop: -// 1. Query for ready merge-requests from wisp storage -// 2. If none, return (will try again on next tick) -// 3. Process the highest priority, oldest MR -func (e *Engineer) processOnce(ctx context.Context) error { - // Check context before starting - select { - case <-ctx.Done(): - return nil - default: - } - - // 1. Query MR queue (wisp storage - already sorted by priority/age) - pendingMRs, err := e.mrQueue.List() - if err != nil { - return fmt.Errorf("querying merge queue: %w", err) - } - - // 2. If empty, return - if len(pendingMRs) == 0 { - return nil - } - - // 3. Select highest priority, oldest MR (List already returns sorted) - mr := pendingMRs[0] - - fmt.Fprintf(e.output, "[Engineer] Processing: %s (%s)\n", mr.ID, mr.Title) - - // 4. Process MR - result := e.ProcessMRFromQueue(ctx, mr) - - // 5. Handle result - if result.Success { - e.handleSuccessFromQueue(mr, result) - } else { - e.handleFailureFromQueue(mr, result) - } - - return nil -} - // ProcessResult contains the result of processing a merge request. type ProcessResult struct { Success bool @@ -372,72 +280,3 @@ func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) { // Full failure handling (assign back to worker, labels) in gt-3x1.4 } - -// ProcessMRFromQueue processes a merge request from wisp queue. -func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) ProcessResult { - // Emit merge_started event - actor := fmt.Sprintf("%s/refinery", e.rig.Name) - _ = events.LogFeed(events.TypeMergeStarted, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "")) - - // MR fields are directly on the struct (no parsing needed) - fmt.Fprintln(e.output, "[Engineer] Processing MR from queue:") - fmt.Fprintf(e.output, " Branch: %s\n", mr.Branch) - fmt.Fprintf(e.output, " Target: %s\n", mr.Target) - fmt.Fprintf(e.output, " Worker: %s\n", mr.Worker) - fmt.Fprintf(e.output, " Source: %s\n", mr.SourceIssue) - - // TODO: Actual merge implementation - // For now, return failure - actual implementation in gt-3x1.2 - return ProcessResult{ - Success: false, - Error: "ProcessMRFromQueue not fully implemented (see gt-3x1.2)", - } -} - -// handleSuccessFromQueue handles a successful merge from wisp queue. -func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) { - actor := fmt.Sprintf("%s/refinery", e.rig.Name) - - // Emit merged event - _ = events.LogFeed(events.TypeMerged, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "")) - - // 1. Close source issue with reference to MR - if mr.SourceIssue != "" { - closeReason := fmt.Sprintf("Merged in %s", mr.ID) - if err := e.beads.CloseWithReason(closeReason, mr.SourceIssue); err != nil { - fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mr.SourceIssue, err) - } else { - fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mr.SourceIssue) - } - } - - // 2. Delete source branch if configured (local only) - if e.config.DeleteMergedBranches && mr.Branch != "" { - if err := e.git.DeleteBranch(mr.Branch, true); err != nil { - fmt.Fprintf(e.output, "[Engineer] Warning: failed to delete branch %s: %v\n", mr.Branch, err) - } else { - fmt.Fprintf(e.output, "[Engineer] Deleted local branch: %s\n", mr.Branch) - } - } - - // 3. Remove MR from queue (ephemeral - just delete the file) - if err := e.mrQueue.Remove(mr.ID); err != nil { - fmt.Fprintf(e.output, "[Engineer] Warning: failed to remove MR from queue: %v\n", err) - } - - // 4. Log success - fmt.Fprintf(e.output, "[Engineer] ✓ Merged: %s (commit: %s)\n", mr.ID, result.MergeCommit) -} - -// handleFailureFromQueue handles a failed merge from wisp queue. -func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) { - actor := fmt.Sprintf("%s/refinery", e.rig.Name) - - // Emit merge_failed event - _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error)) - - // MR stays in queue for retry - no action needed on the file - // Log the failure - fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error) - fmt.Fprintln(e.output, "[Engineer] MR remains in queue for retry") -} diff --git a/internal/refinery/engineer_test.go b/internal/refinery/engineer_test.go index bd8b010c..ae4adbf8 100644 --- a/internal/refinery/engineer_test.go +++ b/internal/refinery/engineer_test.go @@ -206,9 +206,6 @@ func TestNewEngineer(t *testing.T) { if e.config == nil { t.Error("expected config to be initialized with defaults") } - if e.stopCh == nil { - t.Error("expected stopCh to be initialized") - } } func TestEngineer_DeleteMergedBranchesConfig(t *testing.T) {