feat(refinery): Integrate merge-slot gate for conflict resolution

Adds merge-slot integration to the Refinery's Engineer for serializing
conflict resolution. When a conflict is detected:
- Acquire the merge slot before creating a conflict resolution task
- If slot is held, defer task creation (MR stays in queue)
- Release slot after successful merge

This prevents cascading conflicts from multiple polecats racing to
resolve conflicts simultaneously.

Adds MergeSlot wrapper functions to beads package for slot operations.

(gt-4u49x)

🤖 Generated with Claude Code

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
furiosa
2026-01-02 18:03:45 -08:00
committed by Steve Yegge
parent f7839c2724
commit efd9434ee1
2 changed files with 172 additions and 0 deletions

View File

@@ -1436,3 +1436,130 @@ func (b *Beads) AddGateWaiter(gateID, waiter string) error {
}
return nil
}
// ===== Merge Slot Functions (serialized conflict resolution) =====
// MergeSlotStatus represents the result of checking a merge slot.
type MergeSlotStatus struct {
ID string `json:"id"`
Available bool `json:"available"`
Holder string `json:"holder,omitempty"`
Waiters []string `json:"waiters,omitempty"`
Error string `json:"error,omitempty"`
}
// MergeSlotCreate creates the merge slot bead for the current rig.
// The slot is used for serialized conflict resolution in the merge queue.
// Returns the slot ID if successful.
func (b *Beads) MergeSlotCreate() (string, error) {
out, err := b.run("merge-slot", "create", "--json")
if err != nil {
return "", fmt.Errorf("creating merge slot: %w", err)
}
var result struct {
ID string `json:"id"`
Status string `json:"status"`
}
if err := json.Unmarshal(out, &result); err != nil {
return "", fmt.Errorf("parsing merge-slot create output: %w", err)
}
return result.ID, nil
}
// MergeSlotCheck checks the availability of the merge slot.
// Returns the current status including holder and waiters if held.
func (b *Beads) MergeSlotCheck() (*MergeSlotStatus, error) {
out, err := b.run("merge-slot", "check", "--json")
if err != nil {
// Check if slot doesn't exist
if strings.Contains(err.Error(), "not found") {
return &MergeSlotStatus{Error: "not found"}, nil
}
return nil, fmt.Errorf("checking merge slot: %w", err)
}
var status MergeSlotStatus
if err := json.Unmarshal(out, &status); err != nil {
return nil, fmt.Errorf("parsing merge-slot check output: %w", err)
}
return &status, nil
}
// MergeSlotAcquire attempts to acquire the merge slot for exclusive access.
// If holder is empty, defaults to BD_ACTOR environment variable.
// If addWaiter is true and the slot is held, the requester is added to the waiters queue.
// Returns the acquisition result.
func (b *Beads) MergeSlotAcquire(holder string, addWaiter bool) (*MergeSlotStatus, error) {
args := []string{"merge-slot", "acquire", "--json"}
if holder != "" {
args = append(args, "--holder="+holder)
}
if addWaiter {
args = append(args, "--wait")
}
out, err := b.run(args...)
if err != nil {
// Parse the output even on error - it may contain useful info
var status MergeSlotStatus
if jsonErr := json.Unmarshal(out, &status); jsonErr == nil {
return &status, nil
}
return nil, fmt.Errorf("acquiring merge slot: %w", err)
}
var status MergeSlotStatus
if err := json.Unmarshal(out, &status); err != nil {
return nil, fmt.Errorf("parsing merge-slot acquire output: %w", err)
}
return &status, nil
}
// MergeSlotRelease releases the merge slot after conflict resolution completes.
// If holder is provided, it verifies the slot is held by that holder before releasing.
func (b *Beads) MergeSlotRelease(holder string) error {
args := []string{"merge-slot", "release", "--json"}
if holder != "" {
args = append(args, "--holder="+holder)
}
out, err := b.run(args...)
if err != nil {
return fmt.Errorf("releasing merge slot: %w", err)
}
var result struct {
Released bool `json:"released"`
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(out, &result); err != nil {
return fmt.Errorf("parsing merge-slot release output: %w", err)
}
if !result.Released && result.Error != "" {
return fmt.Errorf("slot release failed: %s", result.Error)
}
return nil
}
// MergeSlotEnsureExists creates the merge slot if it doesn't exist.
// This is idempotent - safe to call multiple times.
func (b *Beads) MergeSlotEnsureExists() (string, error) {
// Check if slot exists first
status, err := b.MergeSlotCheck()
if err != nil {
return "", err
}
if status.Error == "not found" {
// Create it
return b.MergeSlotCreate()
}
return status.ID, nil
}

View File

@@ -11,6 +11,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/steveyegge/gastown/internal/beads"
@@ -471,6 +472,20 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult)
fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merged event: %v\n", err)
}
// Release merge slot if this was a conflict resolution
// The slot is held while conflict resolution is in progress
holder := e.rig.Name + "/refinery"
if err := e.beads.MergeSlotRelease(holder); err != nil {
// Not an error if slot wasn't held - it's optional
// Only log if it seems like an actual issue
errStr := err.Error()
if !strings.Contains(errStr, "not held") && !strings.Contains(errStr, "not found") {
fmt.Fprintf(e.output, "[Engineer] Warning: failed to release merge slot: %v\n", err)
}
} else {
fmt.Fprintf(e.output, "[Engineer] Released merge slot\n")
}
// 1. Close source issue with reference to MR
if mr.SourceIssue != "" {
closeReason := fmt.Sprintf("Merged in %s", mr.ID)
@@ -551,7 +566,37 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult)
// Priority: inherit from original + boost (P2 -> P1)
// Parent: original MR bead
// Description: metadata including branch, conflict SHA, etc.
//
// Merge Slot Integration:
// Before creating a conflict resolution task, we acquire the merge-slot for this rig.
// This serializes conflict resolution - only one polecat can resolve conflicts at a time.
// If the slot is already held, we skip creating the task and let the MR stay in queue.
// When the current resolution completes and merges, the slot is released.
func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, result ProcessResult) (string, error) {
// === MERGE SLOT GATE: Serialize conflict resolution ===
// Ensure merge slot exists (idempotent)
slotID, err := e.beads.MergeSlotEnsureExists()
if err != nil {
fmt.Fprintf(e.output, "[Engineer] Warning: could not ensure merge slot: %v\n", err)
// Continue anyway - slot is optional for now
} else {
// Try to acquire the merge slot
holder := e.rig.Name + "/refinery"
status, err := e.beads.MergeSlotAcquire(holder, false)
if err != nil {
fmt.Fprintf(e.output, "[Engineer] Warning: could not acquire merge slot: %v\n", err)
// Continue anyway - slot is optional
} else if !status.Available && status.Holder != "" && status.Holder != holder {
// Slot is held by someone else - skip creating the task
// The MR stays in queue and will retry when slot is released
fmt.Fprintf(e.output, "[Engineer] Merge slot held by %s - deferring conflict resolution\n", status.Holder)
fmt.Fprintf(e.output, "[Engineer] MR %s will retry after current resolution completes\n", mr.ID)
return "", nil // Not an error - just deferred
}
// Either we acquired the slot, or status indicates we already hold it
fmt.Fprintf(e.output, "[Engineer] Acquired merge slot: %s\n", slotID)
}
// Get the current main SHA for conflict tracking
mainSHA, err := e.git.Rev("origin/" + mr.Target)
if err != nil {