feat(refinery): Non-blocking delegation via bead-gates (gt-hibbj)
When merge conflicts occur, the Refinery now creates a conflict resolution task and blocks the MR on that task, allowing the queue to continue to the next MR without waiting. Changes: - Add BlockedBy field to mrqueue.MR for tracking blocking tasks - Update handleFailureFromQueue to set BlockedBy after creating conflict task - Add ListReady method to mrqueue that filters out blocked MRs - Add ListBlocked method for monitoring blocked MRs - Add IsBeadOpen, ListReadyMRs, ListBlockedMRs helpers to Engineer - Add 'gt refinery ready' command (unclaimed AND unblocked MRs) - Add 'gt refinery blocked' command (shows blocked MRs) When the conflict resolution task closes, the MR unblocks and re-enters the ready queue for processing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -168,6 +168,43 @@ Examples:
|
||||
|
||||
var refineryUnclaimedJSON bool
|
||||
|
||||
var refineryReadyCmd = &cobra.Command{
|
||||
Use: "ready [rig]",
|
||||
Short: "List MRs ready for processing (unclaimed and unblocked)",
|
||||
Long: `List merge requests ready for processing.
|
||||
|
||||
Shows MRs that are:
|
||||
- Not currently claimed by any worker (or claim is stale)
|
||||
- Not blocked by an open task (e.g., conflict resolution in progress)
|
||||
|
||||
This is the preferred command for finding work to process.
|
||||
|
||||
Examples:
|
||||
gt refinery ready
|
||||
gt refinery ready --json`,
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
RunE: runRefineryReady,
|
||||
}
|
||||
|
||||
var refineryReadyJSON bool
|
||||
|
||||
var refineryBlockedCmd = &cobra.Command{
|
||||
Use: "blocked [rig]",
|
||||
Short: "List MRs blocked by open tasks",
|
||||
Long: `List merge requests blocked by open tasks.
|
||||
|
||||
Shows MRs waiting for conflict resolution or other blocking tasks to complete.
|
||||
When the blocking task closes, the MR will appear in 'ready'.
|
||||
|
||||
Examples:
|
||||
gt refinery blocked
|
||||
gt refinery blocked --json`,
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
RunE: runRefineryBlocked,
|
||||
}
|
||||
|
||||
var refineryBlockedJSON bool
|
||||
|
||||
func init() {
|
||||
// Start flags
|
||||
refineryStartCmd.Flags().BoolVar(&refineryForeground, "foreground", false, "Run in foreground (default: background)")
|
||||
@@ -181,6 +218,12 @@ func init() {
|
||||
// Unclaimed flags
|
||||
refineryUnclaimedCmd.Flags().BoolVar(&refineryUnclaimedJSON, "json", false, "Output as JSON")
|
||||
|
||||
// Ready flags
|
||||
refineryReadyCmd.Flags().BoolVar(&refineryReadyJSON, "json", false, "Output as JSON")
|
||||
|
||||
// Blocked flags
|
||||
refineryBlockedCmd.Flags().BoolVar(&refineryBlockedJSON, "json", false, "Output as JSON")
|
||||
|
||||
// Add subcommands
|
||||
refineryCmd.AddCommand(refineryStartCmd)
|
||||
refineryCmd.AddCommand(refineryStopCmd)
|
||||
@@ -191,6 +234,8 @@ func init() {
|
||||
refineryCmd.AddCommand(refineryClaimCmd)
|
||||
refineryCmd.AddCommand(refineryReleaseCmd)
|
||||
refineryCmd.AddCommand(refineryUnclaimedCmd)
|
||||
refineryCmd.AddCommand(refineryReadyCmd)
|
||||
refineryCmd.AddCommand(refineryBlockedCmd)
|
||||
|
||||
rootCmd.AddCommand(refineryCmd)
|
||||
}
|
||||
@@ -571,3 +616,94 @@ func runRefineryUnclaimed(cmd *cobra.Command, args []string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func runRefineryReady(cmd *cobra.Command, args []string) error {
|
||||
rigName := ""
|
||||
if len(args) > 0 {
|
||||
rigName = args[0]
|
||||
}
|
||||
|
||||
_, r, rigName, err := getRefineryManager(rigName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create engineer for the rig (it has beads access for status checking)
|
||||
eng := refinery.NewEngineer(r)
|
||||
|
||||
// Get ready MRs (unclaimed AND unblocked)
|
||||
ready, err := eng.ListReadyMRs()
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing ready MRs: %w", err)
|
||||
}
|
||||
|
||||
// JSON output
|
||||
if refineryReadyJSON {
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(ready)
|
||||
}
|
||||
|
||||
// Human-readable output
|
||||
fmt.Printf("%s Ready MRs for '%s':\n\n", style.Bold.Render("🚀"), rigName)
|
||||
|
||||
if len(ready) == 0 {
|
||||
fmt.Printf(" %s\n", style.Dim.Render("(none ready)"))
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, mr := range ready {
|
||||
priority := fmt.Sprintf("P%d", mr.Priority)
|
||||
fmt.Printf(" %d. [%s] %s → %s\n", i+1, priority, mr.Branch, mr.Target)
|
||||
fmt.Printf(" ID: %s Worker: %s\n", mr.ID, mr.Worker)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func runRefineryBlocked(cmd *cobra.Command, args []string) error {
|
||||
rigName := ""
|
||||
if len(args) > 0 {
|
||||
rigName = args[0]
|
||||
}
|
||||
|
||||
_, r, rigName, err := getRefineryManager(rigName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create engineer for the rig (it has beads access for status checking)
|
||||
eng := refinery.NewEngineer(r)
|
||||
|
||||
// Get blocked MRs
|
||||
blocked, err := eng.ListBlockedMRs()
|
||||
if err != nil {
|
||||
return fmt.Errorf("listing blocked MRs: %w", err)
|
||||
}
|
||||
|
||||
// JSON output
|
||||
if refineryBlockedJSON {
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(blocked)
|
||||
}
|
||||
|
||||
// Human-readable output
|
||||
fmt.Printf("%s Blocked MRs for '%s':\n\n", style.Bold.Render("🚧"), rigName)
|
||||
|
||||
if len(blocked) == 0 {
|
||||
fmt.Printf(" %s\n", style.Dim.Render("(none blocked)"))
|
||||
return nil
|
||||
}
|
||||
|
||||
for i, mr := range blocked {
|
||||
priority := fmt.Sprintf("P%d", mr.Priority)
|
||||
fmt.Printf(" %d. [%s] %s → %s\n", i+1, priority, mr.Branch, mr.Target)
|
||||
fmt.Printf(" ID: %s Worker: %s\n", mr.ID, mr.Worker)
|
||||
if mr.BlockedBy != "" {
|
||||
fmt.Printf(" Blocked by: %s\n", mr.BlockedBy)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -36,6 +36,9 @@ type MR struct {
|
||||
// Claiming fields for parallel refinery workers
|
||||
ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR
|
||||
ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed
|
||||
|
||||
// Blocking fields for non-blocking delegation
|
||||
BlockedBy string `json:"blocked_by,omitempty"` // Task ID that blocks this MR (e.g., conflict resolution task)
|
||||
}
|
||||
|
||||
// Queue manages the MR storage.
|
||||
@@ -354,3 +357,113 @@ var (
|
||||
ErrNotFound = fmt.Errorf("merge request not found")
|
||||
ErrAlreadyClaimed = fmt.Errorf("merge request already claimed by another worker")
|
||||
)
|
||||
|
||||
// SetBlockedBy marks an MR as blocked by a task (e.g., conflict resolution).
|
||||
// When the blocking task closes, the MR becomes ready for processing again.
|
||||
func (q *Queue) SetBlockedBy(mrID, taskID string) error {
|
||||
path := filepath.Join(q.dir, mrID+".json")
|
||||
|
||||
mr, err := q.load(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return ErrNotFound
|
||||
}
|
||||
return fmt.Errorf("loading MR: %w", err)
|
||||
}
|
||||
|
||||
mr.BlockedBy = taskID
|
||||
|
||||
data, err := json.MarshalIndent(mr, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshaling MR: %w", err)
|
||||
}
|
||||
|
||||
return os.WriteFile(path, data, 0644)
|
||||
}
|
||||
|
||||
// ClearBlockedBy removes the blocking task from an MR.
|
||||
func (q *Queue) ClearBlockedBy(mrID string) error {
|
||||
return q.SetBlockedBy(mrID, "")
|
||||
}
|
||||
|
||||
// IsBlocked checks if an MR is blocked by a task that is still open.
|
||||
// If blocked, returns true and the blocking task ID.
|
||||
// checkStatus is a function that checks if a bead is still open.
|
||||
func (mr *MR) IsBlocked(checkStatus func(beadID string) (isOpen bool, err error)) (bool, string, error) {
|
||||
if mr.BlockedBy == "" {
|
||||
return false, "", nil
|
||||
}
|
||||
|
||||
isOpen, err := checkStatus(mr.BlockedBy)
|
||||
if err != nil {
|
||||
// If we can't check status, assume not blocked (fail open)
|
||||
return false, "", nil
|
||||
}
|
||||
|
||||
return isOpen, mr.BlockedBy, nil
|
||||
}
|
||||
|
||||
// BeadStatusChecker is a function type that checks if a bead is open.
|
||||
// Returns true if the bead is open (not closed), false if closed or not found.
|
||||
type BeadStatusChecker func(beadID string) (isOpen bool, err error)
|
||||
|
||||
// ListReady returns MRs that are ready for processing:
|
||||
// - Not claimed by another worker (or claim is stale)
|
||||
// - Not blocked by an open task
|
||||
// Sorted by priority score (highest first).
|
||||
// The checkStatus function is used to check if blocking tasks are still open.
|
||||
func (q *Queue) ListReady(checkStatus BeadStatusChecker) ([]*MR, error) {
|
||||
all, err := q.ListByScore()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ready []*MR
|
||||
for _, mr := range all {
|
||||
// Skip if claimed by another worker (and not stale)
|
||||
if mr.ClaimedBy != "" {
|
||||
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout {
|
||||
continue
|
||||
}
|
||||
// Stale claim - include in ready list
|
||||
}
|
||||
|
||||
// Skip if blocked by an open task
|
||||
if mr.BlockedBy != "" && checkStatus != nil {
|
||||
isOpen, err := checkStatus(mr.BlockedBy)
|
||||
if err == nil && isOpen {
|
||||
// Blocked by an open task - skip
|
||||
continue
|
||||
}
|
||||
// If error or task closed, proceed (fail open)
|
||||
}
|
||||
|
||||
ready = append(ready, mr)
|
||||
}
|
||||
|
||||
return ready, nil
|
||||
}
|
||||
|
||||
// ListBlocked returns MRs that are blocked by open tasks.
|
||||
// Useful for reporting/monitoring.
|
||||
func (q *Queue) ListBlocked(checkStatus BeadStatusChecker) ([]*MR, error) {
|
||||
all, err := q.List()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var blocked []*MR
|
||||
for _, mr := range all {
|
||||
if mr.BlockedBy == "" {
|
||||
continue
|
||||
}
|
||||
if checkStatus != nil {
|
||||
isOpen, err := checkStatus(mr.BlockedBy)
|
||||
if err == nil && isOpen {
|
||||
blocked = append(blocked, mr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return blocked, nil
|
||||
}
|
||||
|
||||
@@ -507,6 +507,8 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult)
|
||||
}
|
||||
|
||||
// handleFailureFromQueue handles a failed merge from wisp queue.
|
||||
// For conflicts, creates a resolution task and blocks the MR until resolved.
|
||||
// This enables non-blocking delegation: the queue continues to the next MR.
|
||||
func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) {
|
||||
// Emit merge_failed event
|
||||
if err := e.eventLogger.LogMergeFailed(mr, result.Error); err != nil {
|
||||
@@ -514,20 +516,34 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult)
|
||||
}
|
||||
|
||||
// If this was a conflict, create a conflict-resolution task for dispatch
|
||||
// and block the MR until the task is resolved (non-blocking delegation)
|
||||
if result.Conflict {
|
||||
if err := e.createConflictResolutionTask(mr, result); err != nil {
|
||||
taskID, err := e.createConflictResolutionTask(mr, result)
|
||||
if err != nil {
|
||||
fmt.Fprintf(e.output, "[Engineer] Warning: failed to create conflict resolution task: %v\n", err)
|
||||
} else {
|
||||
// Block the MR on the conflict resolution task
|
||||
// When the task closes, the MR unblocks and re-enters the ready queue
|
||||
if err := e.mrQueue.SetBlockedBy(mr.ID, taskID); err != nil {
|
||||
fmt.Fprintf(e.output, "[Engineer] Warning: failed to block MR on task: %v\n", err)
|
||||
} else {
|
||||
fmt.Fprintf(e.output, "[Engineer] MR %s blocked on conflict task %s (non-blocking delegation)\n", mr.ID, taskID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MR stays in queue for retry - no action needed on the file
|
||||
// Log the failure
|
||||
// Log the failure - MR stays in queue but may be blocked
|
||||
fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error)
|
||||
fmt.Fprintln(e.output, "[Engineer] MR remains in queue for retry")
|
||||
if mr.BlockedBy != "" {
|
||||
fmt.Fprintln(e.output, "[Engineer] MR blocked pending conflict resolution - queue continues to next MR")
|
||||
} else {
|
||||
fmt.Fprintln(e.output, "[Engineer] MR remains in queue for retry")
|
||||
}
|
||||
}
|
||||
|
||||
// createConflictResolutionTask creates a dispatchable task for resolving merge conflicts.
|
||||
// This task will be picked up by bd ready and can be dispatched to an available polecat.
|
||||
// Returns the created task's ID for blocking the MR until resolution.
|
||||
//
|
||||
// Task format:
|
||||
// Title: Resolve merge conflicts: <original-issue-title>
|
||||
@@ -535,7 +551,7 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult)
|
||||
// Priority: inherit from original + boost (P2 -> P1)
|
||||
// Parent: original MR bead
|
||||
// Description: metadata including branch, conflict SHA, etc.
|
||||
func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, result ProcessResult) error {
|
||||
func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, result ProcessResult) (string, error) {
|
||||
// Get the current main SHA for conflict tracking
|
||||
mainSHA, err := e.git.Rev("origin/" + mr.Target)
|
||||
if err != nil {
|
||||
@@ -599,17 +615,42 @@ The Refinery will automatically retry the merge after you force-push.`,
|
||||
Actor: e.rig.Name + "/refinery",
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating conflict resolution task: %w", err)
|
||||
return "", fmt.Errorf("creating conflict resolution task: %w", err)
|
||||
}
|
||||
|
||||
// Add dependency: the conflict task depends on nothing, but the MR depends on the task
|
||||
// Note: We don't add the task as parent of the MR since MRs are ephemeral in the queue
|
||||
// The task itself serves as the dispatchable work unit
|
||||
// The conflict task's ID is returned so the MR can be blocked on it.
|
||||
// When the task closes, the MR unblocks and re-enters the ready queue.
|
||||
|
||||
fmt.Fprintf(e.output, "[Engineer] Created conflict resolution task: %s (P%d)\n", task.ID, task.Priority)
|
||||
|
||||
// Update the MR's retry count for priority scoring
|
||||
mr.RetryCount = retryCount
|
||||
|
||||
return nil
|
||||
return task.ID, nil
|
||||
}
|
||||
|
||||
// IsBeadOpen checks if a bead is still open (not closed).
|
||||
// This is used as a status checker for mrqueue.ListReady to filter blocked MRs.
|
||||
func (e *Engineer) IsBeadOpen(beadID string) (bool, error) {
|
||||
issue, err := e.beads.Show(beadID)
|
||||
if err != nil {
|
||||
// If we can't find the bead, treat as not open (fail open - allow MR to proceed)
|
||||
return false, nil
|
||||
}
|
||||
// "closed" status means the bead is done
|
||||
return issue.Status != "closed", nil
|
||||
}
|
||||
|
||||
// ListReadyMRs returns MRs that are ready for processing:
|
||||
// - Not claimed by another worker (or claim is stale)
|
||||
// - Not blocked by an open task
|
||||
// Sorted by priority score (highest first).
|
||||
func (e *Engineer) ListReadyMRs() ([]*mrqueue.MR, error) {
|
||||
return e.mrQueue.ListReady(e.IsBeadOpen)
|
||||
}
|
||||
|
||||
// ListBlockedMRs returns MRs that are blocked by open tasks.
|
||||
// Useful for monitoring/reporting.
|
||||
func (e *Engineer) ListBlockedMRs() ([]*mrqueue.MR, error) {
|
||||
return e.mrQueue.ListBlocked(e.IsBeadOpen)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user