From 3ecaf9d6fe12cc0541c337483f28c2760f0c06ef Mon Sep 17 00:00:00 2001 From: toast Date: Thu, 1 Jan 2026 18:46:29 -0800 Subject: [PATCH] feat(refinery): Add parallel worker support with MR claiming (gt-kgszr) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Option A from the issue: multiple refinery workers with locking. Changes to mrqueue: - Add ClaimedBy/ClaimedAt fields to MR struct - Add Claim(id, worker) method with atomic file operations - Add Release(id) method to unclaim on failure - Add ListUnclaimed() to find available work - Add ListClaimedBy(worker) for worker-specific queries - Claims expire after 10 minutes for crash recovery New CLI commands: - gt refinery claim - Claim MR for processing - gt refinery release - Release claim back to queue - gt refinery unclaimed - List available MRs Formula updates: - queue-scan now uses gt refinery unclaimed - process-branch claims MR before processing - handle-failures releases claim on test failure - Claims prevent double-processing by parallel workers Worker ID comes from GT_REFINERY_WORKER env var (default: refinery-1). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../formulas/mol-refinery-patrol.formula.toml | 36 ++++- internal/cmd/refinery.go | 147 ++++++++++++++++++ internal/mrqueue/mrqueue.go | 129 +++++++++++++++ 3 files changed, 307 insertions(+), 5 deletions(-) diff --git a/.beads/formulas/mol-refinery-patrol.formula.toml b/.beads/formulas/mol-refinery-patrol.formula.toml index a968c27f..b96eaddd 100644 --- a/.beads/formulas/mol-refinery-patrol.formula.toml +++ b/.beads/formulas/mol-refinery-patrol.formula.toml @@ -101,17 +101,26 @@ needs = ["inbox-check"] description = """ Check the beads merge queue - this is the SOURCE OF TRUTH for pending merges. +**Parallel Refinery Support**: When multiple refinery workers run in parallel, +each must claim an MR before processing to avoid double-processing. Use the +claiming commands to coordinate. + ```bash git fetch --prune origin + +# List UNCLAIMED MRs (excludes those being processed by other workers) +gt refinery unclaimed + +# Or list all MRs (includes claimed): gt mq list ``` The beads MQ tracks all pending merge requests. Do NOT rely on `git branch -r | grep polecat` as branches may exist without MR beads, or MR beads may exist for already-merged work. -If queue empty, skip to context-check step. +If queue empty (no unclaimed MRs), skip to context-check step. -For each MR in the queue, verify the branch still exists: +For each MR in the unclaimed queue, verify the branch still exists: ```bash git branch -r | grep ``` @@ -127,8 +136,18 @@ id = "process-branch" title = "Process next branch" needs = ["queue-scan"] description = """ -Pick next branch from queue. Rebase on current main. +Pick next branch from queue. **Claim it first** to prevent other workers from +processing it simultaneously. +**Step 1: Claim the MR** +```bash +# Claim before processing (prevents double-processing by parallel workers) +gt refinery claim +``` + +If claim fails (already claimed by another worker), skip to next MR in queue. + +**Step 2: Rebase on current main** ```bash git checkout -b temp origin/ git rebase origin/main @@ -137,7 +156,11 @@ git rebase origin/main If rebase conflicts and unresolvable: - git rebase --abort - Notify polecat/witness to fix and resubmit -- Skip to loop-check for next branch""" +- **Release the claim** so another worker can retry later: `gt refinery release ` +- Skip to loop-check for next branch + +**Note**: The claim automatically expires after 10 minutes if not processed +(for crash recovery). But always release explicitly on failure for faster retry.""" [[steps]] id = "run-tests" @@ -166,6 +189,7 @@ If tests FAILED: 2. If branch caused it: - Abort merge - Notify polecat: "Tests failing. Please fix and resubmit." + - **Release the claim**: `gt refinery release ` - Skip to loop-check 3. If pre-existing on main: - Option A: Fix it yourself (you're the Engineer!) @@ -176,7 +200,9 @@ If tests FAILED: - Fix committed, OR - Bead filed for the failure -This is non-negotiable. Never disavow. Never "note and proceed." """ +This is non-negotiable. Never disavow. Never "note and proceed." + +**Note**: Always release the claim on failure so other workers can retry later.""" [[steps]] id = "merge-push" diff --git a/internal/cmd/refinery.go b/internal/cmd/refinery.go index 27aeaf63..19a9645c 100644 --- a/internal/cmd/refinery.go +++ b/internal/cmd/refinery.go @@ -6,6 +6,7 @@ import ( "os" "github.com/spf13/cobra" + "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/refinery" "github.com/steveyegge/gastown/internal/rig" "github.com/steveyegge/gastown/internal/style" @@ -115,6 +116,57 @@ Examples: RunE: runRefineryRestart, } +var refineryClaimCmd = &cobra.Command{ + Use: "claim ", + Short: "Claim an MR for processing", + Long: `Claim a merge request for processing by this refinery worker. + +When running multiple refinery workers in parallel, each worker must claim +an MR before processing to prevent double-processing. Claims expire after +10 minutes if not processed (for crash recovery). + +The worker ID is automatically determined from the GT_REFINERY_WORKER +environment variable, or defaults to "refinery-1". + +Examples: + gt refinery claim gt-abc123 + GT_REFINERY_WORKER=refinery-2 gt refinery claim gt-abc123`, + Args: cobra.ExactArgs(1), + RunE: runRefineryClaim, +} + +var refineryReleaseCmd = &cobra.Command{ + Use: "release ", + Short: "Release a claimed MR back to the queue", + Long: `Release a claimed merge request back to the queue. + +Called when processing fails and the MR should be retried by another worker. +This clears the claim so other workers can pick up the MR. + +Examples: + gt refinery release gt-abc123`, + Args: cobra.ExactArgs(1), + RunE: runRefineryRelease, +} + +var refineryUnclaimedCmd = &cobra.Command{ + Use: "unclaimed [rig]", + Short: "List unclaimed MRs available for processing", + Long: `List merge requests that are available for claiming. + +Shows MRs that are not currently claimed by any worker, or have stale +claims (worker may have crashed). Useful for parallel refinery workers +to find work. + +Examples: + gt refinery unclaimed + gt refinery unclaimed --json`, + Args: cobra.MaximumNArgs(1), + RunE: runRefineryUnclaimed, +} + +var refineryUnclaimedJSON bool + func init() { // Start flags refineryStartCmd.Flags().BoolVar(&refineryForeground, "foreground", false, "Run in foreground (default: background)") @@ -125,6 +177,9 @@ func init() { // Queue flags refineryQueueCmd.Flags().BoolVar(&refineryQueueJSON, "json", false, "Output as JSON") + // Unclaimed flags + refineryUnclaimedCmd.Flags().BoolVar(&refineryUnclaimedJSON, "json", false, "Output as JSON") + // Add subcommands refineryCmd.AddCommand(refineryStartCmd) refineryCmd.AddCommand(refineryStopCmd) @@ -132,6 +187,9 @@ func init() { refineryCmd.AddCommand(refineryStatusCmd) refineryCmd.AddCommand(refineryQueueCmd) refineryCmd.AddCommand(refineryAttachCmd) + refineryCmd.AddCommand(refineryClaimCmd) + refineryCmd.AddCommand(refineryReleaseCmd) + refineryCmd.AddCommand(refineryUnclaimedCmd) rootCmd.AddCommand(refineryCmd) } @@ -423,3 +481,92 @@ func runRefineryRestart(cmd *cobra.Command, args []string) error { fmt.Printf(" %s\n", style.Dim.Render("Use 'gt refinery attach' to connect")) return nil } + +// getWorkerID returns the refinery worker ID from environment or default. +func getWorkerID() string { + if id := os.Getenv("GT_REFINERY_WORKER"); id != "" { + return id + } + return "refinery-1" +} + +func runRefineryClaim(cmd *cobra.Command, args []string) error { + mrID := args[0] + workerID := getWorkerID() + + // Find the queue from current working directory + q, err := mrqueue.NewFromWorkdir(".") + if err != nil { + return fmt.Errorf("finding merge queue: %w", err) + } + + if err := q.Claim(mrID, workerID); err != nil { + if err == mrqueue.ErrNotFound { + return fmt.Errorf("MR %s not found in queue", mrID) + } + if err == mrqueue.ErrAlreadyClaimed { + return fmt.Errorf("MR %s is already claimed by another worker", mrID) + } + return fmt.Errorf("claiming MR: %w", err) + } + + fmt.Printf("%s Claimed %s for %s\n", style.Bold.Render("✓"), mrID, workerID) + return nil +} + +func runRefineryRelease(cmd *cobra.Command, args []string) error { + mrID := args[0] + + q, err := mrqueue.NewFromWorkdir(".") + if err != nil { + return fmt.Errorf("finding merge queue: %w", err) + } + + if err := q.Release(mrID); err != nil { + return fmt.Errorf("releasing MR: %w", err) + } + + fmt.Printf("%s Released %s back to queue\n", style.Bold.Render("✓"), mrID) + return nil +} + +func runRefineryUnclaimed(cmd *cobra.Command, args []string) error { + rigName := "" + if len(args) > 0 { + rigName = args[0] + } + + _, r, rigName, err := getRefineryManager(rigName) + if err != nil { + return err + } + + q := mrqueue.New(r.Path) + unclaimed, err := q.ListUnclaimed() + if err != nil { + return fmt.Errorf("listing unclaimed MRs: %w", err) + } + + // JSON output + if refineryUnclaimedJSON { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(unclaimed) + } + + // Human-readable output + fmt.Printf("%s Unclaimed MRs for '%s':\n\n", style.Bold.Render("📋"), rigName) + + if len(unclaimed) == 0 { + fmt.Printf(" %s\n", style.Dim.Render("(none available)")) + return nil + } + + for i, mr := range unclaimed { + priority := fmt.Sprintf("P%d", mr.Priority) + fmt.Printf(" %d. [%s] %s → %s\n", i+1, priority, mr.Branch, mr.Target) + fmt.Printf(" ID: %s Worker: %s\n", mr.ID, mr.Worker) + } + + return nil +} diff --git a/internal/mrqueue/mrqueue.go b/internal/mrqueue/mrqueue.go index 0e9ee411..c2037915 100644 --- a/internal/mrqueue/mrqueue.go +++ b/internal/mrqueue/mrqueue.go @@ -27,6 +27,10 @@ type MR struct { Priority int `json:"priority"` // Priority (lower = higher priority) CreatedAt time.Time `json:"created_at"` AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability) + + // Claiming fields for parallel refinery workers + ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR + ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed } // Queue manages the MR storage. @@ -182,3 +186,128 @@ func (q *Queue) Count() int { func (q *Queue) Dir() string { return q.dir } + +// ClaimStaleTimeout is how long before a claimed MR is considered stale. +// If a worker claims an MR but doesn't process it within this time, +// another worker can reclaim it. +const ClaimStaleTimeout = 10 * time.Minute + +// Claim attempts to claim an MR for processing by a specific worker. +// Returns nil if successful, ErrAlreadyClaimed if another worker has it, +// or ErrNotFound if the MR doesn't exist. +// Uses atomic file operations to prevent race conditions. +func (q *Queue) Claim(id, workerID string) error { + path := filepath.Join(q.dir, id+".json") + + // Read current state + mr, err := q.load(path) + if err != nil { + if os.IsNotExist(err) { + return ErrNotFound + } + return fmt.Errorf("loading MR: %w", err) + } + + // Check if already claimed by another worker + if mr.ClaimedBy != "" && mr.ClaimedBy != workerID { + // Check if claim is stale (worker may have crashed) + if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout { + return ErrAlreadyClaimed + } + // Stale claim - allow reclaim + } + + // Claim the MR + now := time.Now() + mr.ClaimedBy = workerID + mr.ClaimedAt = &now + + // Write atomically + data, err := json.MarshalIndent(mr, "", " ") + if err != nil { + return fmt.Errorf("marshaling MR: %w", err) + } + + // Write to temp file first, then rename (atomic on most filesystems) + tmpPath := path + ".tmp" + if err := os.WriteFile(tmpPath, data, 0644); err != nil { + return fmt.Errorf("writing temp file: %w", err) + } + if err := os.Rename(tmpPath, path); err != nil { + os.Remove(tmpPath) // cleanup + return fmt.Errorf("renaming temp file: %w", err) + } + + return nil +} + +// Release releases a claimed MR back to the queue. +// Called when processing fails and the MR should be retried. +func (q *Queue) Release(id string) error { + path := filepath.Join(q.dir, id+".json") + + mr, err := q.load(path) + if err != nil { + if os.IsNotExist(err) { + return nil // Already removed + } + return fmt.Errorf("loading MR: %w", err) + } + + // Clear claim + mr.ClaimedBy = "" + mr.ClaimedAt = nil + + data, err := json.MarshalIndent(mr, "", " ") + if err != nil { + return fmt.Errorf("marshaling MR: %w", err) + } + + return os.WriteFile(path, data, 0644) +} + +// ListUnclaimed returns MRs that are not claimed or have stale claims. +// Sorted by priority then creation time. +func (q *Queue) ListUnclaimed() ([]*MR, error) { + all, err := q.List() + if err != nil { + return nil, err + } + + var unclaimed []*MR + for _, mr := range all { + if mr.ClaimedBy == "" { + unclaimed = append(unclaimed, mr) + continue + } + // Check if claim is stale + if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) >= ClaimStaleTimeout { + unclaimed = append(unclaimed, mr) + } + } + + return unclaimed, nil +} + +// ListClaimedBy returns MRs claimed by a specific worker. +func (q *Queue) ListClaimedBy(workerID string) ([]*MR, error) { + all, err := q.List() + if err != nil { + return nil, err + } + + var claimed []*MR + for _, mr := range all { + if mr.ClaimedBy == workerID { + claimed = append(claimed, mr) + } + } + + return claimed, nil +} + +// Common errors for claiming +var ( + ErrNotFound = fmt.Errorf("merge request not found") + ErrAlreadyClaimed = fmt.Errorf("merge request already claimed by another worker") +)