feat(refinery): Add parallel worker support with MR claiming (gt-kgszr)

Implements Option A from the issue: multiple refinery workers with locking.

Changes to mrqueue:
- Add ClaimedBy/ClaimedAt fields to MR struct
- Add Claim(id, worker) method with atomic file operations
- Add Release(id) method to unclaim on failure
- Add ListUnclaimed() to find available work
- Add ListClaimedBy(worker) for worker-specific queries
- Claims expire after 10 minutes for crash recovery

New CLI commands:
- gt refinery claim <mr-id> - Claim MR for processing
- gt refinery release <mr-id> - Release claim back to queue
- gt refinery unclaimed - List available MRs

Formula updates:
- queue-scan now uses gt refinery unclaimed
- process-branch claims MR before processing
- handle-failures releases claim on test failure
- Claims prevent double-processing by parallel workers

Worker ID comes from GT_REFINERY_WORKER env var (default: refinery-1).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
toast
2026-01-01 18:46:29 -08:00
committed by Steve Yegge
parent e159489edb
commit 3ecaf9d6fe
3 changed files with 307 additions and 5 deletions

View File

@@ -101,17 +101,26 @@ needs = ["inbox-check"]
description = """ description = """
Check the beads merge queue - this is the SOURCE OF TRUTH for pending merges. Check the beads merge queue - this is the SOURCE OF TRUTH for pending merges.
**Parallel Refinery Support**: When multiple refinery workers run in parallel,
each must claim an MR before processing to avoid double-processing. Use the
claiming commands to coordinate.
```bash ```bash
git fetch --prune origin git fetch --prune origin
# List UNCLAIMED MRs (excludes those being processed by other workers)
gt refinery unclaimed
# Or list all MRs (includes claimed):
gt mq list <rig> gt mq list <rig>
``` ```
The beads MQ tracks all pending merge requests. Do NOT rely on `git branch -r | grep polecat` The beads MQ tracks all pending merge requests. Do NOT rely on `git branch -r | grep polecat`
as branches may exist without MR beads, or MR beads may exist for already-merged work. as branches may exist without MR beads, or MR beads may exist for already-merged work.
If queue empty, skip to context-check step. If queue empty (no unclaimed MRs), skip to context-check step.
For each MR in the queue, verify the branch still exists: For each MR in the unclaimed queue, verify the branch still exists:
```bash ```bash
git branch -r | grep <branch> git branch -r | grep <branch>
``` ```
@@ -127,8 +136,18 @@ id = "process-branch"
title = "Process next branch" title = "Process next branch"
needs = ["queue-scan"] needs = ["queue-scan"]
description = """ description = """
Pick next branch from queue. Rebase on current main. Pick next branch from queue. **Claim it first** to prevent other workers from
processing it simultaneously.
**Step 1: Claim the MR**
```bash
# Claim before processing (prevents double-processing by parallel workers)
gt refinery claim <mr-id>
```
If claim fails (already claimed by another worker), skip to next MR in queue.
**Step 2: Rebase on current main**
```bash ```bash
git checkout -b temp origin/<polecat-branch> git checkout -b temp origin/<polecat-branch>
git rebase origin/main git rebase origin/main
@@ -137,7 +156,11 @@ git rebase origin/main
If rebase conflicts and unresolvable: If rebase conflicts and unresolvable:
- git rebase --abort - git rebase --abort
- Notify polecat/witness to fix and resubmit - Notify polecat/witness to fix and resubmit
- Skip to loop-check for next branch""" - **Release the claim** so another worker can retry later: `gt refinery release <mr-id>`
- Skip to loop-check for next branch
**Note**: The claim automatically expires after 10 minutes if not processed
(for crash recovery). But always release explicitly on failure for faster retry."""
[[steps]] [[steps]]
id = "run-tests" id = "run-tests"
@@ -166,6 +189,7 @@ If tests FAILED:
2. If branch caused it: 2. If branch caused it:
- Abort merge - Abort merge
- Notify polecat: "Tests failing. Please fix and resubmit." - Notify polecat: "Tests failing. Please fix and resubmit."
- **Release the claim**: `gt refinery release <mr-id>`
- Skip to loop-check - Skip to loop-check
3. If pre-existing on main: 3. If pre-existing on main:
- Option A: Fix it yourself (you're the Engineer!) - Option A: Fix it yourself (you're the Engineer!)
@@ -176,7 +200,9 @@ If tests FAILED:
- Fix committed, OR - Fix committed, OR
- Bead filed for the failure - Bead filed for the failure
This is non-negotiable. Never disavow. Never "note and proceed." """ This is non-negotiable. Never disavow. Never "note and proceed."
**Note**: Always release the claim on failure so other workers can retry later."""
[[steps]] [[steps]]
id = "merge-push" id = "merge-push"

View File

@@ -6,6 +6,7 @@ import (
"os" "os"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/steveyegge/gastown/internal/mrqueue"
"github.com/steveyegge/gastown/internal/refinery" "github.com/steveyegge/gastown/internal/refinery"
"github.com/steveyegge/gastown/internal/rig" "github.com/steveyegge/gastown/internal/rig"
"github.com/steveyegge/gastown/internal/style" "github.com/steveyegge/gastown/internal/style"
@@ -115,6 +116,57 @@ Examples:
RunE: runRefineryRestart, RunE: runRefineryRestart,
} }
var refineryClaimCmd = &cobra.Command{
Use: "claim <mr-id>",
Short: "Claim an MR for processing",
Long: `Claim a merge request for processing by this refinery worker.
When running multiple refinery workers in parallel, each worker must claim
an MR before processing to prevent double-processing. Claims expire after
10 minutes if not processed (for crash recovery).
The worker ID is automatically determined from the GT_REFINERY_WORKER
environment variable, or defaults to "refinery-1".
Examples:
gt refinery claim gt-abc123
GT_REFINERY_WORKER=refinery-2 gt refinery claim gt-abc123`,
Args: cobra.ExactArgs(1),
RunE: runRefineryClaim,
}
var refineryReleaseCmd = &cobra.Command{
Use: "release <mr-id>",
Short: "Release a claimed MR back to the queue",
Long: `Release a claimed merge request back to the queue.
Called when processing fails and the MR should be retried by another worker.
This clears the claim so other workers can pick up the MR.
Examples:
gt refinery release gt-abc123`,
Args: cobra.ExactArgs(1),
RunE: runRefineryRelease,
}
var refineryUnclaimedCmd = &cobra.Command{
Use: "unclaimed [rig]",
Short: "List unclaimed MRs available for processing",
Long: `List merge requests that are available for claiming.
Shows MRs that are not currently claimed by any worker, or have stale
claims (worker may have crashed). Useful for parallel refinery workers
to find work.
Examples:
gt refinery unclaimed
gt refinery unclaimed --json`,
Args: cobra.MaximumNArgs(1),
RunE: runRefineryUnclaimed,
}
var refineryUnclaimedJSON bool
func init() { func init() {
// Start flags // Start flags
refineryStartCmd.Flags().BoolVar(&refineryForeground, "foreground", false, "Run in foreground (default: background)") refineryStartCmd.Flags().BoolVar(&refineryForeground, "foreground", false, "Run in foreground (default: background)")
@@ -125,6 +177,9 @@ func init() {
// Queue flags // Queue flags
refineryQueueCmd.Flags().BoolVar(&refineryQueueJSON, "json", false, "Output as JSON") refineryQueueCmd.Flags().BoolVar(&refineryQueueJSON, "json", false, "Output as JSON")
// Unclaimed flags
refineryUnclaimedCmd.Flags().BoolVar(&refineryUnclaimedJSON, "json", false, "Output as JSON")
// Add subcommands // Add subcommands
refineryCmd.AddCommand(refineryStartCmd) refineryCmd.AddCommand(refineryStartCmd)
refineryCmd.AddCommand(refineryStopCmd) refineryCmd.AddCommand(refineryStopCmd)
@@ -132,6 +187,9 @@ func init() {
refineryCmd.AddCommand(refineryStatusCmd) refineryCmd.AddCommand(refineryStatusCmd)
refineryCmd.AddCommand(refineryQueueCmd) refineryCmd.AddCommand(refineryQueueCmd)
refineryCmd.AddCommand(refineryAttachCmd) refineryCmd.AddCommand(refineryAttachCmd)
refineryCmd.AddCommand(refineryClaimCmd)
refineryCmd.AddCommand(refineryReleaseCmd)
refineryCmd.AddCommand(refineryUnclaimedCmd)
rootCmd.AddCommand(refineryCmd) rootCmd.AddCommand(refineryCmd)
} }
@@ -423,3 +481,92 @@ func runRefineryRestart(cmd *cobra.Command, args []string) error {
fmt.Printf(" %s\n", style.Dim.Render("Use 'gt refinery attach' to connect")) fmt.Printf(" %s\n", style.Dim.Render("Use 'gt refinery attach' to connect"))
return nil return nil
} }
// getWorkerID returns the refinery worker ID from environment or default.
func getWorkerID() string {
if id := os.Getenv("GT_REFINERY_WORKER"); id != "" {
return id
}
return "refinery-1"
}
func runRefineryClaim(cmd *cobra.Command, args []string) error {
mrID := args[0]
workerID := getWorkerID()
// Find the queue from current working directory
q, err := mrqueue.NewFromWorkdir(".")
if err != nil {
return fmt.Errorf("finding merge queue: %w", err)
}
if err := q.Claim(mrID, workerID); err != nil {
if err == mrqueue.ErrNotFound {
return fmt.Errorf("MR %s not found in queue", mrID)
}
if err == mrqueue.ErrAlreadyClaimed {
return fmt.Errorf("MR %s is already claimed by another worker", mrID)
}
return fmt.Errorf("claiming MR: %w", err)
}
fmt.Printf("%s Claimed %s for %s\n", style.Bold.Render("✓"), mrID, workerID)
return nil
}
func runRefineryRelease(cmd *cobra.Command, args []string) error {
mrID := args[0]
q, err := mrqueue.NewFromWorkdir(".")
if err != nil {
return fmt.Errorf("finding merge queue: %w", err)
}
if err := q.Release(mrID); err != nil {
return fmt.Errorf("releasing MR: %w", err)
}
fmt.Printf("%s Released %s back to queue\n", style.Bold.Render("✓"), mrID)
return nil
}
func runRefineryUnclaimed(cmd *cobra.Command, args []string) error {
rigName := ""
if len(args) > 0 {
rigName = args[0]
}
_, r, rigName, err := getRefineryManager(rigName)
if err != nil {
return err
}
q := mrqueue.New(r.Path)
unclaimed, err := q.ListUnclaimed()
if err != nil {
return fmt.Errorf("listing unclaimed MRs: %w", err)
}
// JSON output
if refineryUnclaimedJSON {
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
return enc.Encode(unclaimed)
}
// Human-readable output
fmt.Printf("%s Unclaimed MRs for '%s':\n\n", style.Bold.Render("📋"), rigName)
if len(unclaimed) == 0 {
fmt.Printf(" %s\n", style.Dim.Render("(none available)"))
return nil
}
for i, mr := range unclaimed {
priority := fmt.Sprintf("P%d", mr.Priority)
fmt.Printf(" %d. [%s] %s → %s\n", i+1, priority, mr.Branch, mr.Target)
fmt.Printf(" ID: %s Worker: %s\n", mr.ID, mr.Worker)
}
return nil
}

View File

@@ -27,6 +27,10 @@ type MR struct {
Priority int `json:"priority"` // Priority (lower = higher priority) Priority int `json:"priority"` // Priority (lower = higher priority)
CreatedAt time.Time `json:"created_at"` CreatedAt time.Time `json:"created_at"`
AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability) AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability)
// Claiming fields for parallel refinery workers
ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR
ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed
} }
// Queue manages the MR storage. // Queue manages the MR storage.
@@ -182,3 +186,128 @@ func (q *Queue) Count() int {
func (q *Queue) Dir() string { func (q *Queue) Dir() string {
return q.dir return q.dir
} }
// ClaimStaleTimeout is how long before a claimed MR is considered stale.
// If a worker claims an MR but doesn't process it within this time,
// another worker can reclaim it.
const ClaimStaleTimeout = 10 * time.Minute
// Claim attempts to claim an MR for processing by a specific worker.
// Returns nil if successful, ErrAlreadyClaimed if another worker has it,
// or ErrNotFound if the MR doesn't exist.
// Uses atomic file operations to prevent race conditions.
func (q *Queue) Claim(id, workerID string) error {
path := filepath.Join(q.dir, id+".json")
// Read current state
mr, err := q.load(path)
if err != nil {
if os.IsNotExist(err) {
return ErrNotFound
}
return fmt.Errorf("loading MR: %w", err)
}
// Check if already claimed by another worker
if mr.ClaimedBy != "" && mr.ClaimedBy != workerID {
// Check if claim is stale (worker may have crashed)
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout {
return ErrAlreadyClaimed
}
// Stale claim - allow reclaim
}
// Claim the MR
now := time.Now()
mr.ClaimedBy = workerID
mr.ClaimedAt = &now
// Write atomically
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
// Write to temp file first, then rename (atomic on most filesystems)
tmpPath := path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
return fmt.Errorf("writing temp file: %w", err)
}
if err := os.Rename(tmpPath, path); err != nil {
os.Remove(tmpPath) // cleanup
return fmt.Errorf("renaming temp file: %w", err)
}
return nil
}
// Release releases a claimed MR back to the queue.
// Called when processing fails and the MR should be retried.
func (q *Queue) Release(id string) error {
path := filepath.Join(q.dir, id+".json")
mr, err := q.load(path)
if err != nil {
if os.IsNotExist(err) {
return nil // Already removed
}
return fmt.Errorf("loading MR: %w", err)
}
// Clear claim
mr.ClaimedBy = ""
mr.ClaimedAt = nil
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
return os.WriteFile(path, data, 0644)
}
// ListUnclaimed returns MRs that are not claimed or have stale claims.
// Sorted by priority then creation time.
func (q *Queue) ListUnclaimed() ([]*MR, error) {
all, err := q.List()
if err != nil {
return nil, err
}
var unclaimed []*MR
for _, mr := range all {
if mr.ClaimedBy == "" {
unclaimed = append(unclaimed, mr)
continue
}
// Check if claim is stale
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) >= ClaimStaleTimeout {
unclaimed = append(unclaimed, mr)
}
}
return unclaimed, nil
}
// ListClaimedBy returns MRs claimed by a specific worker.
func (q *Queue) ListClaimedBy(workerID string) ([]*MR, error) {
all, err := q.List()
if err != nil {
return nil, err
}
var claimed []*MR
for _, mr := range all {
if mr.ClaimedBy == workerID {
claimed = append(claimed, mr)
}
}
return claimed, nil
}
// Common errors for claiming
var (
ErrNotFound = fmt.Errorf("merge request not found")
ErrAlreadyClaimed = fmt.Errorf("merge request already claimed by another worker")
)