diff --git a/internal/beads/beads.go b/internal/beads/beads.go index f89eb503..fa3fdfa1 100644 --- a/internal/beads/beads.go +++ b/internal/beads/beads.go @@ -1436,3 +1436,130 @@ func (b *Beads) AddGateWaiter(gateID, waiter string) error { } return nil } + +// ===== Merge Slot Functions (serialized conflict resolution) ===== + +// MergeSlotStatus represents the result of checking a merge slot. +type MergeSlotStatus struct { + ID string `json:"id"` + Available bool `json:"available"` + Holder string `json:"holder,omitempty"` + Waiters []string `json:"waiters,omitempty"` + Error string `json:"error,omitempty"` +} + +// MergeSlotCreate creates the merge slot bead for the current rig. +// The slot is used for serialized conflict resolution in the merge queue. +// Returns the slot ID if successful. +func (b *Beads) MergeSlotCreate() (string, error) { + out, err := b.run("merge-slot", "create", "--json") + if err != nil { + return "", fmt.Errorf("creating merge slot: %w", err) + } + + var result struct { + ID string `json:"id"` + Status string `json:"status"` + } + if err := json.Unmarshal(out, &result); err != nil { + return "", fmt.Errorf("parsing merge-slot create output: %w", err) + } + + return result.ID, nil +} + +// MergeSlotCheck checks the availability of the merge slot. +// Returns the current status including holder and waiters if held. +func (b *Beads) MergeSlotCheck() (*MergeSlotStatus, error) { + out, err := b.run("merge-slot", "check", "--json") + if err != nil { + // Check if slot doesn't exist + if strings.Contains(err.Error(), "not found") { + return &MergeSlotStatus{Error: "not found"}, nil + } + return nil, fmt.Errorf("checking merge slot: %w", err) + } + + var status MergeSlotStatus + if err := json.Unmarshal(out, &status); err != nil { + return nil, fmt.Errorf("parsing merge-slot check output: %w", err) + } + + return &status, nil +} + +// MergeSlotAcquire attempts to acquire the merge slot for exclusive access. +// If holder is empty, defaults to BD_ACTOR environment variable. +// If addWaiter is true and the slot is held, the requester is added to the waiters queue. +// Returns the acquisition result. +func (b *Beads) MergeSlotAcquire(holder string, addWaiter bool) (*MergeSlotStatus, error) { + args := []string{"merge-slot", "acquire", "--json"} + if holder != "" { + args = append(args, "--holder="+holder) + } + if addWaiter { + args = append(args, "--wait") + } + + out, err := b.run(args...) + if err != nil { + // Parse the output even on error - it may contain useful info + var status MergeSlotStatus + if jsonErr := json.Unmarshal(out, &status); jsonErr == nil { + return &status, nil + } + return nil, fmt.Errorf("acquiring merge slot: %w", err) + } + + var status MergeSlotStatus + if err := json.Unmarshal(out, &status); err != nil { + return nil, fmt.Errorf("parsing merge-slot acquire output: %w", err) + } + + return &status, nil +} + +// MergeSlotRelease releases the merge slot after conflict resolution completes. +// If holder is provided, it verifies the slot is held by that holder before releasing. +func (b *Beads) MergeSlotRelease(holder string) error { + args := []string{"merge-slot", "release", "--json"} + if holder != "" { + args = append(args, "--holder="+holder) + } + + out, err := b.run(args...) + if err != nil { + return fmt.Errorf("releasing merge slot: %w", err) + } + + var result struct { + Released bool `json:"released"` + Error string `json:"error,omitempty"` + } + if err := json.Unmarshal(out, &result); err != nil { + return fmt.Errorf("parsing merge-slot release output: %w", err) + } + + if !result.Released && result.Error != "" { + return fmt.Errorf("slot release failed: %s", result.Error) + } + + return nil +} + +// MergeSlotEnsureExists creates the merge slot if it doesn't exist. +// This is idempotent - safe to call multiple times. +func (b *Beads) MergeSlotEnsureExists() (string, error) { + // Check if slot exists first + status, err := b.MergeSlotCheck() + if err != nil { + return "", err + } + + if status.Error == "not found" { + // Create it + return b.MergeSlotCreate() + } + + return status.ID, nil +} diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index 63767a85..995642e3 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -11,6 +11,7 @@ import ( "os" "os/exec" "path/filepath" + "strings" "time" "github.com/steveyegge/gastown/internal/beads" @@ -471,6 +472,20 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merged event: %v\n", err) } + // Release merge slot if this was a conflict resolution + // The slot is held while conflict resolution is in progress + holder := e.rig.Name + "/refinery" + if err := e.beads.MergeSlotRelease(holder); err != nil { + // Not an error if slot wasn't held - it's optional + // Only log if it seems like an actual issue + errStr := err.Error() + if !strings.Contains(errStr, "not held") && !strings.Contains(errStr, "not found") { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to release merge slot: %v\n", err) + } + } else { + fmt.Fprintf(e.output, "[Engineer] Released merge slot\n") + } + // 1. Close source issue with reference to MR if mr.SourceIssue != "" { closeReason := fmt.Sprintf("Merged in %s", mr.ID) @@ -551,7 +566,37 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) // Priority: inherit from original + boost (P2 -> P1) // Parent: original MR bead // Description: metadata including branch, conflict SHA, etc. +// +// Merge Slot Integration: +// Before creating a conflict resolution task, we acquire the merge-slot for this rig. +// This serializes conflict resolution - only one polecat can resolve conflicts at a time. +// If the slot is already held, we skip creating the task and let the MR stay in queue. +// When the current resolution completes and merges, the slot is released. func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, result ProcessResult) (string, error) { + // === MERGE SLOT GATE: Serialize conflict resolution === + // Ensure merge slot exists (idempotent) + slotID, err := e.beads.MergeSlotEnsureExists() + if err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: could not ensure merge slot: %v\n", err) + // Continue anyway - slot is optional for now + } else { + // Try to acquire the merge slot + holder := e.rig.Name + "/refinery" + status, err := e.beads.MergeSlotAcquire(holder, false) + if err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: could not acquire merge slot: %v\n", err) + // Continue anyway - slot is optional + } else if !status.Available && status.Holder != "" && status.Holder != holder { + // Slot is held by someone else - skip creating the task + // The MR stays in queue and will retry when slot is released + fmt.Fprintf(e.output, "[Engineer] Merge slot held by %s - deferring conflict resolution\n", status.Holder) + fmt.Fprintf(e.output, "[Engineer] MR %s will retry after current resolution completes\n", mr.ID) + return "", nil // Not an error - just deferred + } + // Either we acquired the slot, or status indicates we already hold it + fmt.Fprintf(e.output, "[Engineer] Acquired merge slot: %s\n", slotID) + } + // Get the current main SHA for conflict tracking mainSHA, err := e.git.Rev("origin/" + mr.Target) if err != nil {