feat(mrqueue): Add MQ priority objective function (gt-si8rq.1)

Implement ScoreMR function for merge queue priority ordering with:
- Convoy age factor (prevents starvation of old convoys)
- Priority factor (P0 beats P4)
- Retry penalty (prevents thrashing on conflict-prone MRs)
- MR age tiebreaker (FIFO within same priority)

Added fields to MR struct:
- RetryCount for conflict retry tracking
- ConvoyID and ConvoyCreatedAt for convoy linkage

Includes comprehensive unit tests and documentation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
furiosa
2026-01-02 01:26:03 -08:00
committed by Steve Yegge
parent 87d79c40ee
commit 17fd366888
3 changed files with 586 additions and 0 deletions

View File

@@ -28,6 +28,11 @@ type MR struct {
CreatedAt time.Time `json:"created_at"`
AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability)
// Priority scoring fields
RetryCount int `json:"retry_count,omitempty"` // Conflict retry count for priority penalty
ConvoyID string `json:"convoy_id,omitempty"` // Parent convoy ID if part of a convoy
ConvoyCreatedAt *time.Time `json:"convoy_created_at,omitempty"` // Convoy creation time for starvation prevention
// Claiming fields for parallel refinery workers
ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR
ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed
@@ -102,6 +107,7 @@ func (q *Queue) Submit(mr *MR) error {
}
// List returns all pending MRs, sorted by priority then creation time.
// Deprecated: Use ListByScore for priority-aware ordering.
func (q *Queue) List() ([]*MR, error) {
entries, err := os.ReadDir(q.dir)
if err != nil {
@@ -135,6 +141,43 @@ func (q *Queue) List() ([]*MR, error) {
return mrs, nil
}
// ListByScore returns all pending MRs sorted by priority score (highest first).
// Uses the ScoreMR function which considers:
// - Convoy age (prevents starvation)
// - Issue priority (P0-P4)
// - Retry count (prevents thrashing)
// - MR age (FIFO tiebreaker)
func (q *Queue) ListByScore() ([]*MR, error) {
entries, err := os.ReadDir(q.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil // Empty queue
}
return nil, fmt.Errorf("reading mq directory: %w", err)
}
now := time.Now()
var mrs []*MR
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
continue
}
mr, err := q.load(filepath.Join(q.dir, entry.Name()))
if err != nil {
continue // Skip malformed files
}
mrs = append(mrs, mr)
}
// Sort by score (higher first = higher priority)
sort.Slice(mrs, func(i, j int) bool {
return mrs[i].ScoreAt(now) > mrs[j].ScoreAt(now)
})
return mrs, nil
}
// Get retrieves a specific MR by ID.
func (q *Queue) Get(id string) (*MR, error) {
path := filepath.Join(q.dir, id+".json")

View File

@@ -0,0 +1,209 @@
// Package mrqueue provides merge request queue storage and priority scoring.
//
// # MQ Priority Objective Function
//
// The merge queue uses a priority scoring function to determine processing order.
// Higher scores mean higher priority (process first).
//
// ## Scoring Formula
//
// score = BaseScore
// + ConvoyAgeWeight * hoursOld(convoy) // Prevent starvation
// + PriorityWeight * (4 - priority) // P0 > P4
// - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing
// + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker
//
// ## Default Weights
//
// BaseScore: 1000.0 (keeps all scores positive)
// ConvoyAgeWeight: 10.0 (10 pts/hour = 240 pts/day)
// PriorityWeight: 100.0 (P0=+400, P4=+0)
// RetryPenalty: 50.0 (each retry loses 50 pts)
// MRAgeWeight: 1.0 (1 pt/hour, minor FIFO factor)
// MaxRetryPenalty: 300.0 (caps at 6 retries worth)
//
// ## Design Principles
//
// 1. Deterministic: same inputs always produce same score (uses explicit Now param)
//
// 2. Convoy Starvation Prevention: older convoys escalate in priority. A 48-hour
// old P4 convoy will beat a fresh P0 standalone issue (+480 vs +400).
//
// 3. Priority Respect: within similar convoy ages, P0 issues beat P4 issues.
//
// 4. Thrashing Prevention: MRs that repeatedly fail with conflicts get
// deprioritized, giving the repo state time to stabilize.
//
// 5. FIFO Fairness: within same convoy/priority/retry state, older MRs go first.
//
// ## Example Scores
//
// Fresh P0, no convoy: 1400 (1000 + 400)
// Fresh P4, no convoy: 1000 (1000 + 0)
// Fresh P2, 24h convoy: 1440 (1000 + 200 + 240)
// Fresh P4, 48h convoy: 1480 (1000 + 0 + 480)
// P2, 24h convoy, 3 retries: 1290 (1000 + 200 + 240 - 150)
// P0, no convoy, 6+ retries (capped): 1100 (1000 + 400 - 300)
//
// ## Tuning
//
// All weights are configurable via ScoreConfig. The defaults are designed so:
// - A 48-hour convoy beats any standalone priority (starvation prevention)
// - Priority differences dominate within same convoy
// - Retry penalty is significant but capped (eventual progress guaranteed)
package mrqueue
import (
"time"
)
// ScoreConfig contains tunable weights for MR priority scoring.
// All weights are designed so higher scores = higher priority (process first).
type ScoreConfig struct {
// BaseScore is the starting score before applying factors.
// Default: 1000 (keeps all scores positive)
BaseScore float64
// ConvoyAgeWeight is points added per hour of convoy age.
// Older convoys get priority to prevent starvation.
// Default: 10.0 (10 pts/hour = 240 pts/day)
ConvoyAgeWeight float64
// PriorityWeight is multiplied by (4 - priority) so P0 gets most points.
// P0 adds 4*weight, P1 adds 3*weight, ..., P4 adds 0*weight.
// Default: 100.0 (P0 gets +400, P4 gets +0)
PriorityWeight float64
// RetryPenalty is subtracted per retry attempt to prevent thrashing.
// MRs that keep failing get deprioritized, giving repo state time to stabilize.
// Default: 50.0 (each retry loses 50 pts)
RetryPenalty float64
// MRAgeWeight is points added per hour since MR submission.
// Minor factor for FIFO ordering within same priority/convoy.
// Default: 1.0 (1 pt/hour)
MRAgeWeight float64
// MaxRetryPenalty caps the total retry penalty to prevent permanent deprioritization.
// Default: 300.0 (after 6 retries, penalty is capped)
MaxRetryPenalty float64
}
// DefaultScoreConfig returns sensible defaults for MR scoring.
func DefaultScoreConfig() ScoreConfig {
return ScoreConfig{
BaseScore: 1000.0,
ConvoyAgeWeight: 10.0,
PriorityWeight: 100.0,
RetryPenalty: 50.0,
MRAgeWeight: 1.0,
MaxRetryPenalty: 300.0,
}
}
// ScoreInput contains the data needed to score an MR.
// This struct decouples scoring from the MR struct, allowing the
// caller to provide convoy age from external lookups.
type ScoreInput struct {
// Priority is the issue priority (0=P0/critical, 4=P4/backlog).
Priority int
// MRCreatedAt is when the MR was submitted to the queue.
MRCreatedAt time.Time
// ConvoyCreatedAt is when the convoy was created.
// Nil if MR is not part of a convoy (standalone work).
ConvoyCreatedAt *time.Time
// RetryCount is how many times this MR has been retried after conflicts.
// 0 = first attempt.
RetryCount int
// Now is the current time (for deterministic testing).
// If zero, time.Now() is used.
Now time.Time
}
// ScoreMR calculates the priority score for a merge request.
// Higher scores mean higher priority (process first).
//
// The scoring formula:
//
// score = BaseScore
// + ConvoyAgeWeight * hoursOld(convoy) // Prevent convoy starvation
// + PriorityWeight * (4 - priority) // P0=+400, P4=+0
// - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing
// + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker
//
// Design principles:
// - Deterministic: same inputs always produce same score
// - Convoy starvation prevention: older convoys escalate in priority
// - Priority respect: P0 bugs beat P4 backlog items
// - Thrashing prevention: repeated failures get deprioritized
// - FIFO fairness: within same convoy/priority, older MRs go first
func ScoreMR(input ScoreInput, config ScoreConfig) float64 {
now := input.Now
if now.IsZero() {
now = time.Now()
}
score := config.BaseScore
// Convoy age factor: prevent starvation of old convoys
if input.ConvoyCreatedAt != nil {
convoyAge := now.Sub(*input.ConvoyCreatedAt)
convoyHours := convoyAge.Hours()
if convoyHours > 0 {
score += config.ConvoyAgeWeight * convoyHours
}
}
// Priority factor: P0 (0) gets +400, P4 (4) gets +0
priorityBonus := 4 - input.Priority
if priorityBonus < 0 {
priorityBonus = 0 // Clamp for invalid priorities > 4
}
if priorityBonus > 4 {
priorityBonus = 4 // Clamp for invalid priorities < 0
}
score += config.PriorityWeight * float64(priorityBonus)
// Retry penalty: prevent thrashing on repeatedly failing MRs
retryPenalty := config.RetryPenalty * float64(input.RetryCount)
if retryPenalty > config.MaxRetryPenalty {
retryPenalty = config.MaxRetryPenalty
}
score -= retryPenalty
// MR age factor: FIFO ordering as tiebreaker
mrAge := now.Sub(input.MRCreatedAt)
mrHours := mrAge.Hours()
if mrHours > 0 {
score += config.MRAgeWeight * mrHours
}
return score
}
// ScoreMRWithDefaults is a convenience wrapper using default config.
func ScoreMRWithDefaults(input ScoreInput) float64 {
return ScoreMR(input, DefaultScoreConfig())
}
// Score calculates the priority score for this MR using default config.
// Higher scores mean higher priority (process first).
func (mr *MR) Score() float64 {
return mr.ScoreAt(time.Now())
}
// ScoreAt calculates the priority score at a specific time (for deterministic testing).
func (mr *MR) ScoreAt(now time.Time) float64 {
input := ScoreInput{
Priority: mr.Priority,
MRCreatedAt: mr.CreatedAt,
ConvoyCreatedAt: mr.ConvoyCreatedAt,
RetryCount: mr.RetryCount,
Now: now,
}
return ScoreMRWithDefaults(input)
}

View File

@@ -0,0 +1,334 @@
package mrqueue
import (
"testing"
"time"
)
func TestScoreMR_BaseScore(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
input := ScoreInput{
Priority: 2, // P2 (medium)
MRCreatedAt: now,
RetryCount: 0,
Now: now,
}
score := ScoreMR(input, config)
// BaseScore(1000) + Priority(2 gives 4-2=2, so 2*100=200) = 1200
expected := 1200.0
if score != expected {
t.Errorf("expected score %f, got %f", expected, score)
}
}
func TestScoreMR_PriorityOrdering(t *testing.T) {
now := time.Now()
tests := []struct {
priority int
expected float64
}{
{0, 1400.0}, // P0: base(1000) + (4-0)*100 = 1400
{1, 1300.0}, // P1: base(1000) + (4-1)*100 = 1300
{2, 1200.0}, // P2: base(1000) + (4-2)*100 = 1200
{3, 1100.0}, // P3: base(1000) + (4-3)*100 = 1100
{4, 1000.0}, // P4: base(1000) + (4-4)*100 = 1000
}
for _, tt := range tests {
t.Run("P"+string(rune('0'+tt.priority)), func(t *testing.T) {
input := ScoreInput{
Priority: tt.priority,
MRCreatedAt: now,
Now: now,
}
score := ScoreMRWithDefaults(input)
if score != tt.expected {
t.Errorf("P%d: expected %f, got %f", tt.priority, tt.expected, score)
}
})
}
// Verify ordering: P0 > P1 > P2 > P3 > P4
for i := 0; i < 4; i++ {
input1 := ScoreInput{Priority: i, MRCreatedAt: now, Now: now}
input2 := ScoreInput{Priority: i + 1, MRCreatedAt: now, Now: now}
score1 := ScoreMRWithDefaults(input1)
score2 := ScoreMRWithDefaults(input2)
if score1 <= score2 {
t.Errorf("P%d (%f) should score higher than P%d (%f)", i, score1, i+1, score2)
}
}
}
func TestScoreMR_ConvoyAgeEscalation(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
// MR without convoy
noConvoy := ScoreInput{
Priority: 2,
MRCreatedAt: now,
Now: now,
}
scoreNoConvoy := ScoreMR(noConvoy, config)
// MR with 24-hour old convoy
convoyTime := now.Add(-24 * time.Hour)
withConvoy := ScoreInput{
Priority: 2,
MRCreatedAt: now,
ConvoyCreatedAt: &convoyTime,
Now: now,
}
scoreWithConvoy := ScoreMR(withConvoy, config)
// 24 hours * 10 pts/hour = 240 extra points
expectedDiff := 240.0
actualDiff := scoreWithConvoy - scoreNoConvoy
if actualDiff != expectedDiff {
t.Errorf("expected convoy age to add %f pts, got %f", expectedDiff, actualDiff)
}
}
func TestScoreMR_ConvoyStarvationPrevention(t *testing.T) {
now := time.Now()
// P4 issue in 48-hour old convoy vs P0 issue with no convoy
oldConvoy := now.Add(-48 * time.Hour)
lowPriorityOldConvoy := ScoreInput{
Priority: 4, // P4 (lowest)
MRCreatedAt: now,
ConvoyCreatedAt: &oldConvoy,
Now: now,
}
highPriorityNoConvoy := ScoreInput{
Priority: 0, // P0 (highest)
MRCreatedAt: now,
Now: now,
}
scoreOldConvoy := ScoreMRWithDefaults(lowPriorityOldConvoy)
scoreHighPriority := ScoreMRWithDefaults(highPriorityNoConvoy)
// P4 with 48h convoy: 1000 + 0 + 480 = 1480
// P0 with no convoy: 1000 + 400 + 0 = 1400
// Old convoy should win (starvation prevention)
if scoreOldConvoy <= scoreHighPriority {
t.Errorf("48h old P4 convoy (%f) should beat P0 no convoy (%f) for starvation prevention",
scoreOldConvoy, scoreHighPriority)
}
}
func TestScoreMR_RetryPenalty(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
// No retries
noRetry := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 0,
Now: now,
}
scoreNoRetry := ScoreMR(noRetry, config)
// 3 retries
threeRetries := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 3,
Now: now,
}
scoreThreeRetries := ScoreMR(threeRetries, config)
// 3 retries * 50 pts penalty = 150 pts less
expectedDiff := 150.0
actualDiff := scoreNoRetry - scoreThreeRetries
if actualDiff != expectedDiff {
t.Errorf("expected 3 retries to lose %f pts, lost %f", expectedDiff, actualDiff)
}
}
func TestScoreMR_RetryPenaltyCapped(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
// Max penalty is 300, so 10 retries should be same as 6
sixRetries := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 6,
Now: now,
}
tenRetries := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 10,
Now: now,
}
scoreSix := ScoreMR(sixRetries, config)
scoreTen := ScoreMR(tenRetries, config)
if scoreSix != scoreTen {
t.Errorf("penalty should be capped: 6 retries (%f) should equal 10 retries (%f)",
scoreSix, scoreTen)
}
// Both should be base(1000) + priority(200) - maxPenalty(300) = 900
expected := 900.0
if scoreSix != expected {
t.Errorf("expected capped score %f, got %f", expected, scoreSix)
}
}
func TestScoreMR_MRAgeAsTiebreaker(t *testing.T) {
now := time.Now()
// Two MRs with same priority, one submitted 10 hours ago
oldMR := ScoreInput{
Priority: 2,
MRCreatedAt: now.Add(-10 * time.Hour),
Now: now,
}
newMR := ScoreInput{
Priority: 2,
MRCreatedAt: now,
Now: now,
}
scoreOld := ScoreMRWithDefaults(oldMR)
scoreNew := ScoreMRWithDefaults(newMR)
// Old MR should have 10 pts more (1 pt/hour)
expectedDiff := 10.0
actualDiff := scoreOld - scoreNew
if actualDiff != expectedDiff {
t.Errorf("older MR should score %f more, got %f", expectedDiff, actualDiff)
}
}
func TestScoreMR_Deterministic(t *testing.T) {
fixedNow := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
convoyTime := time.Date(2024, 12, 31, 12, 0, 0, 0, time.UTC)
mrTime := time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC)
input := ScoreInput{
Priority: 1,
MRCreatedAt: mrTime,
ConvoyCreatedAt: &convoyTime,
RetryCount: 2,
Now: fixedNow,
}
// Run 100 times, should always be same
first := ScoreMRWithDefaults(input)
for i := 0; i < 100; i++ {
score := ScoreMRWithDefaults(input)
if score != first {
t.Errorf("score not deterministic: iteration %d got %f, expected %f", i, score, first)
}
}
}
func TestScoreMR_InvalidPriorityClamped(t *testing.T) {
now := time.Now()
// Negative priority should clamp to 0 bonus (priority=4)
negativePriority := ScoreInput{
Priority: -1,
MRCreatedAt: now,
Now: now,
}
scoreNegative := ScoreMRWithDefaults(negativePriority)
// Very high priority should clamp to max bonus (priority=0)
highPriority := ScoreInput{
Priority: 10,
MRCreatedAt: now,
Now: now,
}
scoreHigh := ScoreMRWithDefaults(highPriority)
// Negative priority gets clamped to max bonus (4*100=400)
if scoreNegative != 1400.0 {
t.Errorf("negative priority should clamp to P0 bonus, got %f", scoreNegative)
}
// High priority (10) gives 4-10=-6, clamped to 0
if scoreHigh != 1000.0 {
t.Errorf("priority>4 should give 0 bonus, got %f", scoreHigh)
}
}
func TestMR_Score(t *testing.T) {
now := time.Now()
convoyTime := now.Add(-12 * time.Hour)
mr := &MR{
Priority: 1,
CreatedAt: now.Add(-2 * time.Hour),
ConvoyCreatedAt: &convoyTime,
RetryCount: 1,
}
score := mr.ScoreAt(now)
// base(1000) + convoy(12*10=120) + priority(3*100=300) - retry(1*50=50) + mrAge(2*1=2)
expected := 1000.0 + 120.0 + 300.0 - 50.0 + 2.0
if score != expected {
t.Errorf("MR.ScoreAt expected %f, got %f", expected, score)
}
}
func TestScoreMR_EdgeCases(t *testing.T) {
now := time.Now()
tests := []struct {
name string
input ScoreInput
}{
{
name: "zero time MR",
input: ScoreInput{
Priority: 2,
MRCreatedAt: time.Time{},
Now: now,
},
},
{
name: "future MR",
input: ScoreInput{
Priority: 2,
MRCreatedAt: now.Add(24 * time.Hour),
Now: now,
},
},
{
name: "future convoy",
input: ScoreInput{
Priority: 2,
MRCreatedAt: now,
ConvoyCreatedAt: func() *time.Time { t := now.Add(24 * time.Hour); return &t }(),
Now: now,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Should not panic
score := ScoreMRWithDefaults(tt.input)
// Score should still be reasonable (>= base - maxPenalty)
if score < 700 {
t.Errorf("score %f unexpectedly low for edge case", score)
}
})
}
}