From 8517ff065028721375a7cc6660ec0c244e1aa40f Mon Sep 17 00:00:00 2001 From: nux Date: Fri, 2 Jan 2026 17:16:29 -0800 Subject: [PATCH] feat(mq): Add priority-ordered queue display and processing (gt-si8rq.6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements priority scoring for merge queue ordering: ## Changes to gt mq list - Add SCORE column showing priority score (higher = process first) - Sort MRs by score descending instead of simple priority - Add CONVOY column showing convoy ID if tracked ## New gt mq next command - Returns highest-score MR ready for processing - Supports --strategy=fifo for FIFO ordering fallback - Supports --quiet for just printing MR ID - Supports --json for programmatic access ## Changes to Refinery - Queue() now sorts by priority score instead of simple priority - Uses ScoreMR from mrqueue package for consistent scoring ## MR Fields Extended - Added retry_count, last_conflict_sha, conflict_task_id - Added convoy_id, convoy_created_at for convoy tracking - These fields feed into priority scoring function 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/beads/fields.go | 101 +++++++++++++++++---- internal/cmd/mq_list.go | 100 ++++++++++++++++++--- internal/cmd/mq_next.go | 167 +++++++++++++++++++++++++++++++++++ internal/refinery/manager.go | 57 ++++++++++-- 4 files changed, 388 insertions(+), 37 deletions(-) create mode 100644 internal/cmd/mq_next.go diff --git a/internal/beads/fields.go b/internal/beads/fields.go index b574955f..b0cf2b15 100644 --- a/internal/beads/fields.go +++ b/internal/beads/fields.go @@ -1,7 +1,10 @@ // Package beads provides field parsing utilities for structured issue descriptions. package beads -import "strings" +import ( + "fmt" + "strings" +) // Note: AgentFields, ParseAgentFields, FormatAgentDescription, and CreateAgentBead are in beads.go @@ -165,6 +168,15 @@ type MRFields struct { MergeCommit string // SHA of merge commit (set on close) CloseReason string // Reason for closing: merged, rejected, conflict, superseded AgentBead string // Agent bead ID that created this MR (for traceability) + + // Conflict resolution fields (for priority scoring) + RetryCount int // Number of conflict-resolution cycles + LastConflictSHA string // SHA of main when conflict occurred + ConflictTaskID string // Link to conflict-resolution task (if any) + + // Convoy tracking (for priority scoring - convoy starvation prevention) + ConvoyID string // Parent convoy ID if part of a convoy + ConvoyCreatedAt string // Convoy creation time (ISO 8601) for starvation prevention } // ParseMRFields extracts structured merge-request fields from an issue's description. @@ -222,6 +234,23 @@ func ParseMRFields(issue *Issue) *MRFields { case "agent_bead", "agent-bead", "agentbead": fields.AgentBead = value hasFields = true + case "retry_count", "retry-count", "retrycount": + if n, err := parseIntField(value); err == nil { + fields.RetryCount = n + hasFields = true + } + case "last_conflict_sha", "last-conflict-sha", "lastconflictsha": + fields.LastConflictSHA = value + hasFields = true + case "conflict_task_id", "conflict-task-id", "conflicttaskid": + fields.ConflictTaskID = value + hasFields = true + case "convoy_id", "convoy-id", "convoyid", "convoy": + fields.ConvoyID = value + hasFields = true + case "convoy_created_at", "convoy-created-at", "convoycreatedat": + fields.ConvoyCreatedAt = value + hasFields = true } } @@ -231,6 +260,13 @@ func ParseMRFields(issue *Issue) *MRFields { return fields } +// parseIntField parses an integer from a string, returning 0 on error. +func parseIntField(s string) (int, error) { + var n int + _, err := fmt.Sscanf(s, "%d", &n) + return n, err +} + // FormatMRFields formats MRFields as a string suitable for an issue description. // Only non-empty fields are included. func FormatMRFields(fields *MRFields) string { @@ -264,6 +300,21 @@ func FormatMRFields(fields *MRFields) string { if fields.AgentBead != "" { lines = append(lines, "agent_bead: "+fields.AgentBead) } + if fields.RetryCount > 0 { + lines = append(lines, fmt.Sprintf("retry_count: %d", fields.RetryCount)) + } + if fields.LastConflictSHA != "" { + lines = append(lines, "last_conflict_sha: "+fields.LastConflictSHA) + } + if fields.ConflictTaskID != "" { + lines = append(lines, "conflict_task_id: "+fields.ConflictTaskID) + } + if fields.ConvoyID != "" { + lines = append(lines, "convoy_id: "+fields.ConvoyID) + } + if fields.ConvoyCreatedAt != "" { + lines = append(lines, "convoy_created_at: "+fields.ConvoyCreatedAt) + } return strings.Join(lines, "\n") } @@ -278,22 +329,38 @@ func SetMRFields(issue *Issue, fields *MRFields) string { // Known MR field keys (lowercase) mrKeys := map[string]bool{ - "branch": true, - "target": true, - "source_issue": true, - "source-issue": true, - "sourceissue": true, - "worker": true, - "rig": true, - "merge_commit": true, - "merge-commit": true, - "mergecommit": true, - "close_reason": true, - "close-reason": true, - "closereason": true, - "agent_bead": true, - "agent-bead": true, - "agentbead": true, + "branch": true, + "target": true, + "source_issue": true, + "source-issue": true, + "sourceissue": true, + "worker": true, + "rig": true, + "merge_commit": true, + "merge-commit": true, + "mergecommit": true, + "close_reason": true, + "close-reason": true, + "closereason": true, + "agent_bead": true, + "agent-bead": true, + "agentbead": true, + "retry_count": true, + "retry-count": true, + "retrycount": true, + "last_conflict_sha": true, + "last-conflict-sha": true, + "lastconflictsha": true, + "conflict_task_id": true, + "conflict-task-id": true, + "conflicttaskid": true, + "convoy_id": true, + "convoy-id": true, + "convoyid": true, + "convoy": true, + "convoy_created_at": true, + "convoy-created-at": true, + "convoycreatedat": true, } // Collect non-MR lines from existing description diff --git a/internal/cmd/mq_list.go b/internal/cmd/mq_list.go index bface5eb..0feac99b 100644 --- a/internal/cmd/mq_list.go +++ b/internal/cmd/mq_list.go @@ -4,11 +4,13 @@ import ( "encoding/json" "fmt" "os" + "sort" "strings" "time" "github.com/spf13/cobra" "github.com/steveyegge/gastown/internal/beads" + "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/style" ) @@ -59,8 +61,15 @@ func runMQList(cmd *cobra.Command, args []string) error { } } - // Apply additional filters - var filtered []*beads.Issue + // Apply additional filters and calculate scores + now := time.Now() + type scoredIssue struct { + issue *beads.Issue + fields *beads.MRFields + score float64 + } + var scored []scoredIssue + for _, issue := range issues { // Parse MR fields fields := beads.ParseMRFields(issue) @@ -88,7 +97,20 @@ func runMQList(cmd *cobra.Command, args []string) error { } } - filtered = append(filtered, issue) + // Calculate priority score + score := calculateMRScore(issue, fields, now) + scored = append(scored, scoredIssue{issue: issue, fields: fields, score: score}) + } + + // Sort by score descending (highest priority first) + sort.Slice(scored, func(i, j int) bool { + return scored[i].score > scored[j].score + }) + + // Extract filtered issues for JSON output compatibility + var filtered []*beads.Issue + for _, s := range scored { + filtered = append(filtered, s.issue) } // JSON output @@ -104,19 +126,21 @@ func runMQList(cmd *cobra.Command, args []string) error { return nil } - // Create styled table + // Create styled table with SCORE column table := style.NewTable( style.Column{Name: "ID", Width: 12}, - style.Column{Name: "STATUS", Width: 12}, + style.Column{Name: "SCORE", Width: 7, Align: style.AlignRight}, style.Column{Name: "PRI", Width: 4}, - style.Column{Name: "BRANCH", Width: 28}, - style.Column{Name: "WORKER", Width: 10}, + style.Column{Name: "CONVOY", Width: 12}, + style.Column{Name: "BRANCH", Width: 24}, + style.Column{Name: "STATUS", Width: 10}, style.Column{Name: "AGE", Width: 6, Align: style.AlignRight}, ) - // Add rows - for _, issue := range filtered { - fields := beads.ParseMRFields(issue) + // Add rows using scored items (already sorted by score) + for _, item := range scored { + issue := item.issue + fields := item.fields // Determine display status displayStatus := issue.Status @@ -143,10 +167,20 @@ func runMQList(cmd *cobra.Command, args []string) error { // Get MR fields branch := "" - worker := "" + convoyID := "" if fields != nil { branch = fields.Branch - worker = fields.Worker + convoyID = fields.ConvoyID + } + + // Format convoy column + convoyDisplay := style.Dim.Render("(none)") + if convoyID != "" { + // Truncate convoy ID for display + if len(convoyID) > 12 { + convoyID = convoyID[:12] + } + convoyDisplay = convoyID } // Format priority with color @@ -157,6 +191,9 @@ func runMQList(cmd *cobra.Command, args []string) error { priority = style.Warning.Render(priority) } + // Format score + scoreStr := fmt.Sprintf("%.1f", item.score) + // Calculate age age := formatMRAge(issue.CreatedAt) @@ -166,13 +203,14 @@ func runMQList(cmd *cobra.Command, args []string) error { displayID = displayID[:12] } - table.AddRow(displayID, styledStatus, priority, branch, worker, style.Dim.Render(age)) + table.AddRow(displayID, scoreStr, priority, convoyDisplay, branch, styledStatus, style.Dim.Render(age)) } fmt.Print(table.Render()) // Show blocking details below table - for _, issue := range filtered { + for _, item := range scored { + issue := item.issue displayStatus := issue.Status if issue.Status == "open" && (len(issue.BlockedBy) > 0 || issue.BlockedByCount > 0) { displayStatus = "blocked" @@ -221,3 +259,37 @@ func outputJSON(data interface{}) error { enc.SetIndent("", " ") return enc.Encode(data) } + +// calculateMRScore computes the priority score for an MR using the mrqueue scoring function. +// Higher scores mean higher priority (process first). +func calculateMRScore(issue *beads.Issue, fields *beads.MRFields, now time.Time) float64 { + // Parse MR creation time + mrCreatedAt, err := time.Parse(time.RFC3339, issue.CreatedAt) + if err != nil { + mrCreatedAt, err = time.Parse("2006-01-02T15:04:05Z", issue.CreatedAt) + if err != nil { + mrCreatedAt = now // Fallback to now if parsing fails + } + } + + // Build score input + input := mrqueue.ScoreInput{ + Priority: issue.Priority, + MRCreatedAt: mrCreatedAt, + Now: now, + } + + // Add fields from MR metadata if available + if fields != nil { + input.RetryCount = fields.RetryCount + + // Parse convoy created at if available + if fields.ConvoyCreatedAt != "" { + if convoyTime, err := time.Parse(time.RFC3339, fields.ConvoyCreatedAt); err == nil { + input.ConvoyCreatedAt = &convoyTime + } + } + } + + return mrqueue.ScoreMRWithDefaults(input) +} diff --git a/internal/cmd/mq_next.go b/internal/cmd/mq_next.go new file mode 100644 index 00000000..dd0a9148 --- /dev/null +++ b/internal/cmd/mq_next.go @@ -0,0 +1,167 @@ +package cmd + +import ( + "fmt" + "sort" + "time" + + "github.com/spf13/cobra" + "github.com/steveyegge/gastown/internal/beads" + "github.com/steveyegge/gastown/internal/style" +) + +// MQ next command flags +var ( + mqNextStrategy string // "priority" (default) or "fifo" + mqNextJSON bool + mqNextQuiet bool +) + +var mqNextCmd = &cobra.Command{ + Use: "next ", + Short: "Show the highest-priority merge request", + Long: `Show the next merge request to process based on priority score. + +The priority scoring function considers: + - Convoy age: Older convoys get higher priority (starvation prevention) + - Issue priority: P0 > P1 > P2 > P3 > P4 + - Retry count: MRs that fail repeatedly get deprioritized + - MR age: FIFO tiebreaker for same priority/convoy + +Use --strategy=fifo for first-in-first-out ordering instead. + +Examples: + gt mq next gastown # Show highest-priority MR + gt mq next gastown --strategy=fifo # Show oldest MR instead + gt mq next gastown --quiet # Just print the MR ID + gt mq next gastown --json # Output as JSON`, + Args: cobra.ExactArgs(1), + RunE: runMQNext, +} + +func init() { + mqNextCmd.Flags().StringVar(&mqNextStrategy, "strategy", "priority", "Ordering strategy: 'priority' or 'fifo'") + mqNextCmd.Flags().BoolVar(&mqNextJSON, "json", false, "Output as JSON") + mqNextCmd.Flags().BoolVarP(&mqNextQuiet, "quiet", "q", false, "Just print the MR ID") + + mqCmd.AddCommand(mqNextCmd) +} + +func runMQNext(cmd *cobra.Command, args []string) error { + rigName := args[0] + + _, r, _, err := getRefineryManager(rigName) + if err != nil { + return err + } + + // Create beads wrapper for the rig + b := beads.New(r.BeadsPath()) + + // Query for open merge-requests (ready to process) + opts := beads.ListOptions{ + Type: "merge-request", + Status: "open", + Priority: -1, // No priority filter + } + + issues, err := b.List(opts) + if err != nil { + return fmt.Errorf("querying merge queue: %w", err) + } + + // Filter to only ready MRs (no blockers) + var ready []*beads.Issue + for _, issue := range issues { + if len(issue.BlockedBy) == 0 && issue.BlockedByCount == 0 { + ready = append(ready, issue) + } + } + + if len(ready) == 0 { + if mqNextQuiet { + return nil // Silent exit + } + fmt.Printf("%s No ready merge requests in queue\n", style.Dim.Render("ℹ")) + return nil + } + + now := time.Now() + + // Sort based on strategy + if mqNextStrategy == "fifo" { + // FIFO: oldest first by creation time + sort.Slice(ready, func(i, j int) bool { + ti, _ := time.Parse(time.RFC3339, ready[i].CreatedAt) + tj, _ := time.Parse(time.RFC3339, ready[j].CreatedAt) + return ti.Before(tj) + }) + } else { + // Priority: highest score first + type scoredIssue struct { + issue *beads.Issue + score float64 + } + scored := make([]scoredIssue, len(ready)) + for i, issue := range ready { + fields := beads.ParseMRFields(issue) + score := calculateMRScore(issue, fields, now) + scored[i] = scoredIssue{issue: issue, score: score} + } + + sort.Slice(scored, func(i, j int) bool { + return scored[i].score > scored[j].score + }) + + // Rebuild ready slice in sorted order + for i, s := range scored { + ready[i] = s.issue + } + } + + // Get the top MR + next := ready[0] + fields := beads.ParseMRFields(next) + + // Output based on format flags + if mqNextQuiet { + fmt.Println(next.ID) + return nil + } + + if mqNextJSON { + return outputJSON(next) + } + + // Human-readable output + fmt.Printf("%s Next MR to process:\n\n", style.Bold.Render("🎯")) + + score := calculateMRScore(next, fields, now) + + fmt.Printf(" ID: %s\n", next.ID) + fmt.Printf(" Score: %.1f\n", score) + fmt.Printf(" Priority: P%d\n", next.Priority) + + if fields != nil { + if fields.Branch != "" { + fmt.Printf(" Branch: %s\n", fields.Branch) + } + if fields.Worker != "" { + fmt.Printf(" Worker: %s\n", fields.Worker) + } + if fields.ConvoyID != "" { + fmt.Printf(" Convoy: %s\n", fields.ConvoyID) + } + if fields.RetryCount > 0 { + fmt.Printf(" Retries: %d\n", fields.RetryCount) + } + } + + fmt.Printf(" Age: %s\n", formatMRAge(next.CreatedAt)) + + if len(ready) > 1 { + fmt.Printf("\n %s\n", style.Dim.Render(fmt.Sprintf("(%d more in queue)", len(ready)-1))) + } + + return nil +} diff --git a/internal/refinery/manager.go b/internal/refinery/manager.go index ca59187d..9303734c 100644 --- a/internal/refinery/manager.go +++ b/internal/refinery/manager.go @@ -18,6 +18,7 @@ import ( "github.com/steveyegge/gastown/internal/config" "github.com/steveyegge/gastown/internal/events" "github.com/steveyegge/gastown/internal/mail" + "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/rig" "github.com/steveyegge/gastown/internal/tmux" "github.com/steveyegge/gastown/internal/util" @@ -273,14 +274,25 @@ func (m *Manager) Queue() ([]QueueItem, error) { }) } - // Sort issues by priority (P0 first, then P1, etc.) - sort.Slice(issues, func(i, j int) bool { - return issues[i].Priority < issues[j].Priority + // Score and sort issues by priority score (highest first) + now := time.Now() + type scoredIssue struct { + issue *beads.Issue + score float64 + } + scored := make([]scoredIssue, 0, len(issues)) + for _, issue := range issues { + score := m.calculateIssueScore(issue, now) + scored = append(scored, scoredIssue{issue: issue, score: score}) + } + + sort.Slice(scored, func(i, j int) bool { + return scored[i].score > scored[j].score }) - // Convert beads issues to queue items - for _, issue := range issues { - mr := m.issueToMR(issue) + // Convert scored issues to queue items + for _, s := range scored { + mr := m.issueToMR(s.issue) if mr != nil { // Skip if this is the currently processing MR if ref.CurrentMR != nil && ref.CurrentMR.ID == mr.ID { @@ -298,6 +310,39 @@ func (m *Manager) Queue() ([]QueueItem, error) { return items, nil } +// calculateIssueScore computes the priority score for an MR issue. +// Higher scores mean higher priority (process first). +func (m *Manager) calculateIssueScore(issue *beads.Issue, now time.Time) float64 { + fields := beads.ParseMRFields(issue) + + // Parse MR creation time + mrCreatedAt := parseTime(issue.CreatedAt) + if mrCreatedAt.IsZero() { + mrCreatedAt = now // Fallback + } + + // Build score input + input := mrqueue.ScoreInput{ + Priority: issue.Priority, + MRCreatedAt: mrCreatedAt, + Now: now, + } + + // Add fields from MR metadata if available + if fields != nil { + input.RetryCount = fields.RetryCount + + // Parse convoy created at if available + if fields.ConvoyCreatedAt != "" { + if convoyTime := parseTime(fields.ConvoyCreatedAt); !convoyTime.IsZero() { + input.ConvoyCreatedAt = &convoyTime + } + } + } + + return mrqueue.ScoreMRWithDefaults(input) +} + // issueToMR converts a beads issue to a MergeRequest. func (m *Manager) issueToMR(issue *beads.Issue) *MergeRequest { if issue == nil {