From f7839c2724c789f72479d208cba768b68543c51d Mon Sep 17 00:00:00 2001 From: nux Date: Fri, 2 Jan 2026 17:59:04 -0800 Subject: [PATCH] feat(refinery): Non-blocking delegation via bead-gates (gt-hibbj) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When merge conflicts occur, the Refinery now creates a conflict resolution task and blocks the MR on that task, allowing the queue to continue to the next MR without waiting. Changes: - Add BlockedBy field to mrqueue.MR for tracking blocking tasks - Update handleFailureFromQueue to set BlockedBy after creating conflict task - Add ListReady method to mrqueue that filters out blocked MRs - Add ListBlocked method for monitoring blocked MRs - Add IsBeadOpen, ListReadyMRs, ListBlockedMRs helpers to Engineer - Add 'gt refinery ready' command (unclaimed AND unblocked MRs) - Add 'gt refinery blocked' command (shows blocked MRs) When the conflict resolution task closes, the MR unblocks and re-enters the ready queue for processing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/cmd/refinery.go | 136 ++++++++++++++++++++++++++++++++++ internal/mrqueue/mrqueue.go | 113 ++++++++++++++++++++++++++++ internal/refinery/engineer.go | 61 ++++++++++++--- 3 files changed, 300 insertions(+), 10 deletions(-) diff --git a/internal/cmd/refinery.go b/internal/cmd/refinery.go index 30084b66..cf44cd25 100644 --- a/internal/cmd/refinery.go +++ b/internal/cmd/refinery.go @@ -168,6 +168,43 @@ Examples: var refineryUnclaimedJSON bool +var refineryReadyCmd = &cobra.Command{ + Use: "ready [rig]", + Short: "List MRs ready for processing (unclaimed and unblocked)", + Long: `List merge requests ready for processing. + +Shows MRs that are: +- Not currently claimed by any worker (or claim is stale) +- Not blocked by an open task (e.g., conflict resolution in progress) + +This is the preferred command for finding work to process. + +Examples: + gt refinery ready + gt refinery ready --json`, + Args: cobra.MaximumNArgs(1), + RunE: runRefineryReady, +} + +var refineryReadyJSON bool + +var refineryBlockedCmd = &cobra.Command{ + Use: "blocked [rig]", + Short: "List MRs blocked by open tasks", + Long: `List merge requests blocked by open tasks. + +Shows MRs waiting for conflict resolution or other blocking tasks to complete. +When the blocking task closes, the MR will appear in 'ready'. + +Examples: + gt refinery blocked + gt refinery blocked --json`, + Args: cobra.MaximumNArgs(1), + RunE: runRefineryBlocked, +} + +var refineryBlockedJSON bool + func init() { // Start flags refineryStartCmd.Flags().BoolVar(&refineryForeground, "foreground", false, "Run in foreground (default: background)") @@ -181,6 +218,12 @@ func init() { // Unclaimed flags refineryUnclaimedCmd.Flags().BoolVar(&refineryUnclaimedJSON, "json", false, "Output as JSON") + // Ready flags + refineryReadyCmd.Flags().BoolVar(&refineryReadyJSON, "json", false, "Output as JSON") + + // Blocked flags + refineryBlockedCmd.Flags().BoolVar(&refineryBlockedJSON, "json", false, "Output as JSON") + // Add subcommands refineryCmd.AddCommand(refineryStartCmd) refineryCmd.AddCommand(refineryStopCmd) @@ -191,6 +234,8 @@ func init() { refineryCmd.AddCommand(refineryClaimCmd) refineryCmd.AddCommand(refineryReleaseCmd) refineryCmd.AddCommand(refineryUnclaimedCmd) + refineryCmd.AddCommand(refineryReadyCmd) + refineryCmd.AddCommand(refineryBlockedCmd) rootCmd.AddCommand(refineryCmd) } @@ -571,3 +616,94 @@ func runRefineryUnclaimed(cmd *cobra.Command, args []string) error { return nil } + +func runRefineryReady(cmd *cobra.Command, args []string) error { + rigName := "" + if len(args) > 0 { + rigName = args[0] + } + + _, r, rigName, err := getRefineryManager(rigName) + if err != nil { + return err + } + + // Create engineer for the rig (it has beads access for status checking) + eng := refinery.NewEngineer(r) + + // Get ready MRs (unclaimed AND unblocked) + ready, err := eng.ListReadyMRs() + if err != nil { + return fmt.Errorf("listing ready MRs: %w", err) + } + + // JSON output + if refineryReadyJSON { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(ready) + } + + // Human-readable output + fmt.Printf("%s Ready MRs for '%s':\n\n", style.Bold.Render("🚀"), rigName) + + if len(ready) == 0 { + fmt.Printf(" %s\n", style.Dim.Render("(none ready)")) + return nil + } + + for i, mr := range ready { + priority := fmt.Sprintf("P%d", mr.Priority) + fmt.Printf(" %d. [%s] %s → %s\n", i+1, priority, mr.Branch, mr.Target) + fmt.Printf(" ID: %s Worker: %s\n", mr.ID, mr.Worker) + } + + return nil +} + +func runRefineryBlocked(cmd *cobra.Command, args []string) error { + rigName := "" + if len(args) > 0 { + rigName = args[0] + } + + _, r, rigName, err := getRefineryManager(rigName) + if err != nil { + return err + } + + // Create engineer for the rig (it has beads access for status checking) + eng := refinery.NewEngineer(r) + + // Get blocked MRs + blocked, err := eng.ListBlockedMRs() + if err != nil { + return fmt.Errorf("listing blocked MRs: %w", err) + } + + // JSON output + if refineryBlockedJSON { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + return enc.Encode(blocked) + } + + // Human-readable output + fmt.Printf("%s Blocked MRs for '%s':\n\n", style.Bold.Render("🚧"), rigName) + + if len(blocked) == 0 { + fmt.Printf(" %s\n", style.Dim.Render("(none blocked)")) + return nil + } + + for i, mr := range blocked { + priority := fmt.Sprintf("P%d", mr.Priority) + fmt.Printf(" %d. [%s] %s → %s\n", i+1, priority, mr.Branch, mr.Target) + fmt.Printf(" ID: %s Worker: %s\n", mr.ID, mr.Worker) + if mr.BlockedBy != "" { + fmt.Printf(" Blocked by: %s\n", mr.BlockedBy) + } + } + + return nil +} diff --git a/internal/mrqueue/mrqueue.go b/internal/mrqueue/mrqueue.go index 54aedf0e..06f2aaaf 100644 --- a/internal/mrqueue/mrqueue.go +++ b/internal/mrqueue/mrqueue.go @@ -36,6 +36,9 @@ type MR struct { // Claiming fields for parallel refinery workers ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed + + // Blocking fields for non-blocking delegation + BlockedBy string `json:"blocked_by,omitempty"` // Task ID that blocks this MR (e.g., conflict resolution task) } // Queue manages the MR storage. @@ -354,3 +357,113 @@ var ( ErrNotFound = fmt.Errorf("merge request not found") ErrAlreadyClaimed = fmt.Errorf("merge request already claimed by another worker") ) + +// SetBlockedBy marks an MR as blocked by a task (e.g., conflict resolution). +// When the blocking task closes, the MR becomes ready for processing again. +func (q *Queue) SetBlockedBy(mrID, taskID string) error { + path := filepath.Join(q.dir, mrID+".json") + + mr, err := q.load(path) + if err != nil { + if os.IsNotExist(err) { + return ErrNotFound + } + return fmt.Errorf("loading MR: %w", err) + } + + mr.BlockedBy = taskID + + data, err := json.MarshalIndent(mr, "", " ") + if err != nil { + return fmt.Errorf("marshaling MR: %w", err) + } + + return os.WriteFile(path, data, 0644) +} + +// ClearBlockedBy removes the blocking task from an MR. +func (q *Queue) ClearBlockedBy(mrID string) error { + return q.SetBlockedBy(mrID, "") +} + +// IsBlocked checks if an MR is blocked by a task that is still open. +// If blocked, returns true and the blocking task ID. +// checkStatus is a function that checks if a bead is still open. +func (mr *MR) IsBlocked(checkStatus func(beadID string) (isOpen bool, err error)) (bool, string, error) { + if mr.BlockedBy == "" { + return false, "", nil + } + + isOpen, err := checkStatus(mr.BlockedBy) + if err != nil { + // If we can't check status, assume not blocked (fail open) + return false, "", nil + } + + return isOpen, mr.BlockedBy, nil +} + +// BeadStatusChecker is a function type that checks if a bead is open. +// Returns true if the bead is open (not closed), false if closed or not found. +type BeadStatusChecker func(beadID string) (isOpen bool, err error) + +// ListReady returns MRs that are ready for processing: +// - Not claimed by another worker (or claim is stale) +// - Not blocked by an open task +// Sorted by priority score (highest first). +// The checkStatus function is used to check if blocking tasks are still open. +func (q *Queue) ListReady(checkStatus BeadStatusChecker) ([]*MR, error) { + all, err := q.ListByScore() + if err != nil { + return nil, err + } + + var ready []*MR + for _, mr := range all { + // Skip if claimed by another worker (and not stale) + if mr.ClaimedBy != "" { + if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout { + continue + } + // Stale claim - include in ready list + } + + // Skip if blocked by an open task + if mr.BlockedBy != "" && checkStatus != nil { + isOpen, err := checkStatus(mr.BlockedBy) + if err == nil && isOpen { + // Blocked by an open task - skip + continue + } + // If error or task closed, proceed (fail open) + } + + ready = append(ready, mr) + } + + return ready, nil +} + +// ListBlocked returns MRs that are blocked by open tasks. +// Useful for reporting/monitoring. +func (q *Queue) ListBlocked(checkStatus BeadStatusChecker) ([]*MR, error) { + all, err := q.List() + if err != nil { + return nil, err + } + + var blocked []*MR + for _, mr := range all { + if mr.BlockedBy == "" { + continue + } + if checkStatus != nil { + isOpen, err := checkStatus(mr.BlockedBy) + if err == nil && isOpen { + blocked = append(blocked, mr) + } + } + } + + return blocked, nil +} diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index b567c759..63767a85 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -507,6 +507,8 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) } // handleFailureFromQueue handles a failed merge from wisp queue. +// For conflicts, creates a resolution task and blocks the MR until resolved. +// This enables non-blocking delegation: the queue continues to the next MR. func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) { // Emit merge_failed event if err := e.eventLogger.LogMergeFailed(mr, result.Error); err != nil { @@ -514,20 +516,34 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) } // If this was a conflict, create a conflict-resolution task for dispatch + // and block the MR until the task is resolved (non-blocking delegation) if result.Conflict { - if err := e.createConflictResolutionTask(mr, result); err != nil { + taskID, err := e.createConflictResolutionTask(mr, result) + if err != nil { fmt.Fprintf(e.output, "[Engineer] Warning: failed to create conflict resolution task: %v\n", err) + } else { + // Block the MR on the conflict resolution task + // When the task closes, the MR unblocks and re-enters the ready queue + if err := e.mrQueue.SetBlockedBy(mr.ID, taskID); err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to block MR on task: %v\n", err) + } else { + fmt.Fprintf(e.output, "[Engineer] MR %s blocked on conflict task %s (non-blocking delegation)\n", mr.ID, taskID) + } } } - // MR stays in queue for retry - no action needed on the file - // Log the failure + // Log the failure - MR stays in queue but may be blocked fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error) - fmt.Fprintln(e.output, "[Engineer] MR remains in queue for retry") + if mr.BlockedBy != "" { + fmt.Fprintln(e.output, "[Engineer] MR blocked pending conflict resolution - queue continues to next MR") + } else { + fmt.Fprintln(e.output, "[Engineer] MR remains in queue for retry") + } } // createConflictResolutionTask creates a dispatchable task for resolving merge conflicts. // This task will be picked up by bd ready and can be dispatched to an available polecat. +// Returns the created task's ID for blocking the MR until resolution. // // Task format: // Title: Resolve merge conflicts: @@ -535,7 +551,7 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) // Priority: inherit from original + boost (P2 -> P1) // Parent: original MR bead // Description: metadata including branch, conflict SHA, etc. -func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, result ProcessResult) error { +func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, result ProcessResult) (string, error) { // Get the current main SHA for conflict tracking mainSHA, err := e.git.Rev("origin/" + mr.Target) if err != nil { @@ -599,17 +615,42 @@ The Refinery will automatically retry the merge after you force-push.`, Actor: e.rig.Name + "/refinery", }) if err != nil { - return fmt.Errorf("creating conflict resolution task: %w", err) + return "", fmt.Errorf("creating conflict resolution task: %w", err) } - // Add dependency: the conflict task depends on nothing, but the MR depends on the task - // Note: We don't add the task as parent of the MR since MRs are ephemeral in the queue - // The task itself serves as the dispatchable work unit + // The conflict task's ID is returned so the MR can be blocked on it. + // When the task closes, the MR unblocks and re-enters the ready queue. fmt.Fprintf(e.output, "[Engineer] Created conflict resolution task: %s (P%d)\n", task.ID, task.Priority) // Update the MR's retry count for priority scoring mr.RetryCount = retryCount - return nil + return task.ID, nil +} + +// IsBeadOpen checks if a bead is still open (not closed). +// This is used as a status checker for mrqueue.ListReady to filter blocked MRs. +func (e *Engineer) IsBeadOpen(beadID string) (bool, error) { + issue, err := e.beads.Show(beadID) + if err != nil { + // If we can't find the bead, treat as not open (fail open - allow MR to proceed) + return false, nil + } + // "closed" status means the bead is done + return issue.Status != "closed", nil +} + +// ListReadyMRs returns MRs that are ready for processing: +// - Not claimed by another worker (or claim is stale) +// - Not blocked by an open task +// Sorted by priority score (highest first). +func (e *Engineer) ListReadyMRs() ([]*mrqueue.MR, error) { + return e.mrQueue.ListReady(e.IsBeadOpen) +} + +// ListBlockedMRs returns MRs that are blocked by open tasks. +// Useful for monitoring/reporting. +func (e *Engineer) ListBlockedMRs() ([]*mrqueue.MR, error) { + return e.mrQueue.ListBlocked(e.IsBeadOpen) }