diff --git a/internal/mrqueue/mrqueue.go b/internal/mrqueue/mrqueue.go index c2037915..54aedf0e 100644 --- a/internal/mrqueue/mrqueue.go +++ b/internal/mrqueue/mrqueue.go @@ -28,6 +28,11 @@ type MR struct { CreatedAt time.Time `json:"created_at"` AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability) + // Priority scoring fields + RetryCount int `json:"retry_count,omitempty"` // Conflict retry count for priority penalty + ConvoyID string `json:"convoy_id,omitempty"` // Parent convoy ID if part of a convoy + ConvoyCreatedAt *time.Time `json:"convoy_created_at,omitempty"` // Convoy creation time for starvation prevention + // Claiming fields for parallel refinery workers ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed @@ -102,6 +107,7 @@ func (q *Queue) Submit(mr *MR) error { } // List returns all pending MRs, sorted by priority then creation time. +// Deprecated: Use ListByScore for priority-aware ordering. func (q *Queue) List() ([]*MR, error) { entries, err := os.ReadDir(q.dir) if err != nil { @@ -135,6 +141,43 @@ func (q *Queue) List() ([]*MR, error) { return mrs, nil } +// ListByScore returns all pending MRs sorted by priority score (highest first). +// Uses the ScoreMR function which considers: +// - Convoy age (prevents starvation) +// - Issue priority (P0-P4) +// - Retry count (prevents thrashing) +// - MR age (FIFO tiebreaker) +func (q *Queue) ListByScore() ([]*MR, error) { + entries, err := os.ReadDir(q.dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil // Empty queue + } + return nil, fmt.Errorf("reading mq directory: %w", err) + } + + now := time.Now() + var mrs []*MR + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { + continue + } + + mr, err := q.load(filepath.Join(q.dir, entry.Name())) + if err != nil { + continue // Skip malformed files + } + mrs = append(mrs, mr) + } + + // Sort by score (higher first = higher priority) + sort.Slice(mrs, func(i, j int) bool { + return mrs[i].ScoreAt(now) > mrs[j].ScoreAt(now) + }) + + return mrs, nil +} + // Get retrieves a specific MR by ID. func (q *Queue) Get(id string) (*MR, error) { path := filepath.Join(q.dir, id+".json") diff --git a/internal/mrqueue/priority.go b/internal/mrqueue/priority.go new file mode 100644 index 00000000..9bed18da --- /dev/null +++ b/internal/mrqueue/priority.go @@ -0,0 +1,209 @@ +// Package mrqueue provides merge request queue storage and priority scoring. +// +// # MQ Priority Objective Function +// +// The merge queue uses a priority scoring function to determine processing order. +// Higher scores mean higher priority (process first). +// +// ## Scoring Formula +// +// score = BaseScore +// + ConvoyAgeWeight * hoursOld(convoy) // Prevent starvation +// + PriorityWeight * (4 - priority) // P0 > P4 +// - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing +// + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker +// +// ## Default Weights +// +// BaseScore: 1000.0 (keeps all scores positive) +// ConvoyAgeWeight: 10.0 (10 pts/hour = 240 pts/day) +// PriorityWeight: 100.0 (P0=+400, P4=+0) +// RetryPenalty: 50.0 (each retry loses 50 pts) +// MRAgeWeight: 1.0 (1 pt/hour, minor FIFO factor) +// MaxRetryPenalty: 300.0 (caps at 6 retries worth) +// +// ## Design Principles +// +// 1. Deterministic: same inputs always produce same score (uses explicit Now param) +// +// 2. Convoy Starvation Prevention: older convoys escalate in priority. A 48-hour +// old P4 convoy will beat a fresh P0 standalone issue (+480 vs +400). +// +// 3. Priority Respect: within similar convoy ages, P0 issues beat P4 issues. +// +// 4. Thrashing Prevention: MRs that repeatedly fail with conflicts get +// deprioritized, giving the repo state time to stabilize. +// +// 5. FIFO Fairness: within same convoy/priority/retry state, older MRs go first. +// +// ## Example Scores +// +// Fresh P0, no convoy: 1400 (1000 + 400) +// Fresh P4, no convoy: 1000 (1000 + 0) +// Fresh P2, 24h convoy: 1440 (1000 + 200 + 240) +// Fresh P4, 48h convoy: 1480 (1000 + 0 + 480) +// P2, 24h convoy, 3 retries: 1290 (1000 + 200 + 240 - 150) +// P0, no convoy, 6+ retries (capped): 1100 (1000 + 400 - 300) +// +// ## Tuning +// +// All weights are configurable via ScoreConfig. The defaults are designed so: +// - A 48-hour convoy beats any standalone priority (starvation prevention) +// - Priority differences dominate within same convoy +// - Retry penalty is significant but capped (eventual progress guaranteed) +package mrqueue + +import ( + "time" +) + +// ScoreConfig contains tunable weights for MR priority scoring. +// All weights are designed so higher scores = higher priority (process first). +type ScoreConfig struct { + // BaseScore is the starting score before applying factors. + // Default: 1000 (keeps all scores positive) + BaseScore float64 + + // ConvoyAgeWeight is points added per hour of convoy age. + // Older convoys get priority to prevent starvation. + // Default: 10.0 (10 pts/hour = 240 pts/day) + ConvoyAgeWeight float64 + + // PriorityWeight is multiplied by (4 - priority) so P0 gets most points. + // P0 adds 4*weight, P1 adds 3*weight, ..., P4 adds 0*weight. + // Default: 100.0 (P0 gets +400, P4 gets +0) + PriorityWeight float64 + + // RetryPenalty is subtracted per retry attempt to prevent thrashing. + // MRs that keep failing get deprioritized, giving repo state time to stabilize. + // Default: 50.0 (each retry loses 50 pts) + RetryPenalty float64 + + // MRAgeWeight is points added per hour since MR submission. + // Minor factor for FIFO ordering within same priority/convoy. + // Default: 1.0 (1 pt/hour) + MRAgeWeight float64 + + // MaxRetryPenalty caps the total retry penalty to prevent permanent deprioritization. + // Default: 300.0 (after 6 retries, penalty is capped) + MaxRetryPenalty float64 +} + +// DefaultScoreConfig returns sensible defaults for MR scoring. +func DefaultScoreConfig() ScoreConfig { + return ScoreConfig{ + BaseScore: 1000.0, + ConvoyAgeWeight: 10.0, + PriorityWeight: 100.0, + RetryPenalty: 50.0, + MRAgeWeight: 1.0, + MaxRetryPenalty: 300.0, + } +} + +// ScoreInput contains the data needed to score an MR. +// This struct decouples scoring from the MR struct, allowing the +// caller to provide convoy age from external lookups. +type ScoreInput struct { + // Priority is the issue priority (0=P0/critical, 4=P4/backlog). + Priority int + + // MRCreatedAt is when the MR was submitted to the queue. + MRCreatedAt time.Time + + // ConvoyCreatedAt is when the convoy was created. + // Nil if MR is not part of a convoy (standalone work). + ConvoyCreatedAt *time.Time + + // RetryCount is how many times this MR has been retried after conflicts. + // 0 = first attempt. + RetryCount int + + // Now is the current time (for deterministic testing). + // If zero, time.Now() is used. + Now time.Time +} + +// ScoreMR calculates the priority score for a merge request. +// Higher scores mean higher priority (process first). +// +// The scoring formula: +// +// score = BaseScore +// + ConvoyAgeWeight * hoursOld(convoy) // Prevent convoy starvation +// + PriorityWeight * (4 - priority) // P0=+400, P4=+0 +// - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing +// + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker +// +// Design principles: +// - Deterministic: same inputs always produce same score +// - Convoy starvation prevention: older convoys escalate in priority +// - Priority respect: P0 bugs beat P4 backlog items +// - Thrashing prevention: repeated failures get deprioritized +// - FIFO fairness: within same convoy/priority, older MRs go first +func ScoreMR(input ScoreInput, config ScoreConfig) float64 { + now := input.Now + if now.IsZero() { + now = time.Now() + } + + score := config.BaseScore + + // Convoy age factor: prevent starvation of old convoys + if input.ConvoyCreatedAt != nil { + convoyAge := now.Sub(*input.ConvoyCreatedAt) + convoyHours := convoyAge.Hours() + if convoyHours > 0 { + score += config.ConvoyAgeWeight * convoyHours + } + } + + // Priority factor: P0 (0) gets +400, P4 (4) gets +0 + priorityBonus := 4 - input.Priority + if priorityBonus < 0 { + priorityBonus = 0 // Clamp for invalid priorities > 4 + } + if priorityBonus > 4 { + priorityBonus = 4 // Clamp for invalid priorities < 0 + } + score += config.PriorityWeight * float64(priorityBonus) + + // Retry penalty: prevent thrashing on repeatedly failing MRs + retryPenalty := config.RetryPenalty * float64(input.RetryCount) + if retryPenalty > config.MaxRetryPenalty { + retryPenalty = config.MaxRetryPenalty + } + score -= retryPenalty + + // MR age factor: FIFO ordering as tiebreaker + mrAge := now.Sub(input.MRCreatedAt) + mrHours := mrAge.Hours() + if mrHours > 0 { + score += config.MRAgeWeight * mrHours + } + + return score +} + +// ScoreMRWithDefaults is a convenience wrapper using default config. +func ScoreMRWithDefaults(input ScoreInput) float64 { + return ScoreMR(input, DefaultScoreConfig()) +} + +// Score calculates the priority score for this MR using default config. +// Higher scores mean higher priority (process first). +func (mr *MR) Score() float64 { + return mr.ScoreAt(time.Now()) +} + +// ScoreAt calculates the priority score at a specific time (for deterministic testing). +func (mr *MR) ScoreAt(now time.Time) float64 { + input := ScoreInput{ + Priority: mr.Priority, + MRCreatedAt: mr.CreatedAt, + ConvoyCreatedAt: mr.ConvoyCreatedAt, + RetryCount: mr.RetryCount, + Now: now, + } + return ScoreMRWithDefaults(input) +} diff --git a/internal/mrqueue/priority_test.go b/internal/mrqueue/priority_test.go new file mode 100644 index 00000000..fd71b745 --- /dev/null +++ b/internal/mrqueue/priority_test.go @@ -0,0 +1,334 @@ +package mrqueue + +import ( + "testing" + "time" +) + +func TestScoreMR_BaseScore(t *testing.T) { + now := time.Now() + config := DefaultScoreConfig() + + input := ScoreInput{ + Priority: 2, // P2 (medium) + MRCreatedAt: now, + RetryCount: 0, + Now: now, + } + + score := ScoreMR(input, config) + + // BaseScore(1000) + Priority(2 gives 4-2=2, so 2*100=200) = 1200 + expected := 1200.0 + if score != expected { + t.Errorf("expected score %f, got %f", expected, score) + } +} + +func TestScoreMR_PriorityOrdering(t *testing.T) { + now := time.Now() + + tests := []struct { + priority int + expected float64 + }{ + {0, 1400.0}, // P0: base(1000) + (4-0)*100 = 1400 + {1, 1300.0}, // P1: base(1000) + (4-1)*100 = 1300 + {2, 1200.0}, // P2: base(1000) + (4-2)*100 = 1200 + {3, 1100.0}, // P3: base(1000) + (4-3)*100 = 1100 + {4, 1000.0}, // P4: base(1000) + (4-4)*100 = 1000 + } + + for _, tt := range tests { + t.Run("P"+string(rune('0'+tt.priority)), func(t *testing.T) { + input := ScoreInput{ + Priority: tt.priority, + MRCreatedAt: now, + Now: now, + } + score := ScoreMRWithDefaults(input) + if score != tt.expected { + t.Errorf("P%d: expected %f, got %f", tt.priority, tt.expected, score) + } + }) + } + + // Verify ordering: P0 > P1 > P2 > P3 > P4 + for i := 0; i < 4; i++ { + input1 := ScoreInput{Priority: i, MRCreatedAt: now, Now: now} + input2 := ScoreInput{Priority: i + 1, MRCreatedAt: now, Now: now} + score1 := ScoreMRWithDefaults(input1) + score2 := ScoreMRWithDefaults(input2) + if score1 <= score2 { + t.Errorf("P%d (%f) should score higher than P%d (%f)", i, score1, i+1, score2) + } + } +} + +func TestScoreMR_ConvoyAgeEscalation(t *testing.T) { + now := time.Now() + config := DefaultScoreConfig() + + // MR without convoy + noConvoy := ScoreInput{ + Priority: 2, + MRCreatedAt: now, + Now: now, + } + scoreNoConvoy := ScoreMR(noConvoy, config) + + // MR with 24-hour old convoy + convoyTime := now.Add(-24 * time.Hour) + withConvoy := ScoreInput{ + Priority: 2, + MRCreatedAt: now, + ConvoyCreatedAt: &convoyTime, + Now: now, + } + scoreWithConvoy := ScoreMR(withConvoy, config) + + // 24 hours * 10 pts/hour = 240 extra points + expectedDiff := 240.0 + actualDiff := scoreWithConvoy - scoreNoConvoy + if actualDiff != expectedDiff { + t.Errorf("expected convoy age to add %f pts, got %f", expectedDiff, actualDiff) + } +} + +func TestScoreMR_ConvoyStarvationPrevention(t *testing.T) { + now := time.Now() + + // P4 issue in 48-hour old convoy vs P0 issue with no convoy + oldConvoy := now.Add(-48 * time.Hour) + lowPriorityOldConvoy := ScoreInput{ + Priority: 4, // P4 (lowest) + MRCreatedAt: now, + ConvoyCreatedAt: &oldConvoy, + Now: now, + } + + highPriorityNoConvoy := ScoreInput{ + Priority: 0, // P0 (highest) + MRCreatedAt: now, + Now: now, + } + + scoreOldConvoy := ScoreMRWithDefaults(lowPriorityOldConvoy) + scoreHighPriority := ScoreMRWithDefaults(highPriorityNoConvoy) + + // P4 with 48h convoy: 1000 + 0 + 480 = 1480 + // P0 with no convoy: 1000 + 400 + 0 = 1400 + // Old convoy should win (starvation prevention) + if scoreOldConvoy <= scoreHighPriority { + t.Errorf("48h old P4 convoy (%f) should beat P0 no convoy (%f) for starvation prevention", + scoreOldConvoy, scoreHighPriority) + } +} + +func TestScoreMR_RetryPenalty(t *testing.T) { + now := time.Now() + config := DefaultScoreConfig() + + // No retries + noRetry := ScoreInput{ + Priority: 2, + MRCreatedAt: now, + RetryCount: 0, + Now: now, + } + scoreNoRetry := ScoreMR(noRetry, config) + + // 3 retries + threeRetries := ScoreInput{ + Priority: 2, + MRCreatedAt: now, + RetryCount: 3, + Now: now, + } + scoreThreeRetries := ScoreMR(threeRetries, config) + + // 3 retries * 50 pts penalty = 150 pts less + expectedDiff := 150.0 + actualDiff := scoreNoRetry - scoreThreeRetries + if actualDiff != expectedDiff { + t.Errorf("expected 3 retries to lose %f pts, lost %f", expectedDiff, actualDiff) + } +} + +func TestScoreMR_RetryPenaltyCapped(t *testing.T) { + now := time.Now() + config := DefaultScoreConfig() + + // Max penalty is 300, so 10 retries should be same as 6 + sixRetries := ScoreInput{ + Priority: 2, + MRCreatedAt: now, + RetryCount: 6, + Now: now, + } + tenRetries := ScoreInput{ + Priority: 2, + MRCreatedAt: now, + RetryCount: 10, + Now: now, + } + + scoreSix := ScoreMR(sixRetries, config) + scoreTen := ScoreMR(tenRetries, config) + + if scoreSix != scoreTen { + t.Errorf("penalty should be capped: 6 retries (%f) should equal 10 retries (%f)", + scoreSix, scoreTen) + } + + // Both should be base(1000) + priority(200) - maxPenalty(300) = 900 + expected := 900.0 + if scoreSix != expected { + t.Errorf("expected capped score %f, got %f", expected, scoreSix) + } +} + +func TestScoreMR_MRAgeAsTiebreaker(t *testing.T) { + now := time.Now() + + // Two MRs with same priority, one submitted 10 hours ago + oldMR := ScoreInput{ + Priority: 2, + MRCreatedAt: now.Add(-10 * time.Hour), + Now: now, + } + newMR := ScoreInput{ + Priority: 2, + MRCreatedAt: now, + Now: now, + } + + scoreOld := ScoreMRWithDefaults(oldMR) + scoreNew := ScoreMRWithDefaults(newMR) + + // Old MR should have 10 pts more (1 pt/hour) + expectedDiff := 10.0 + actualDiff := scoreOld - scoreNew + if actualDiff != expectedDiff { + t.Errorf("older MR should score %f more, got %f", expectedDiff, actualDiff) + } +} + +func TestScoreMR_Deterministic(t *testing.T) { + fixedNow := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) + convoyTime := time.Date(2024, 12, 31, 12, 0, 0, 0, time.UTC) + mrTime := time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC) + + input := ScoreInput{ + Priority: 1, + MRCreatedAt: mrTime, + ConvoyCreatedAt: &convoyTime, + RetryCount: 2, + Now: fixedNow, + } + + // Run 100 times, should always be same + first := ScoreMRWithDefaults(input) + for i := 0; i < 100; i++ { + score := ScoreMRWithDefaults(input) + if score != first { + t.Errorf("score not deterministic: iteration %d got %f, expected %f", i, score, first) + } + } +} + +func TestScoreMR_InvalidPriorityClamped(t *testing.T) { + now := time.Now() + + // Negative priority should clamp to 0 bonus (priority=4) + negativePriority := ScoreInput{ + Priority: -1, + MRCreatedAt: now, + Now: now, + } + scoreNegative := ScoreMRWithDefaults(negativePriority) + + // Very high priority should clamp to max bonus (priority=0) + highPriority := ScoreInput{ + Priority: 10, + MRCreatedAt: now, + Now: now, + } + scoreHigh := ScoreMRWithDefaults(highPriority) + + // Negative priority gets clamped to max bonus (4*100=400) + if scoreNegative != 1400.0 { + t.Errorf("negative priority should clamp to P0 bonus, got %f", scoreNegative) + } + + // High priority (10) gives 4-10=-6, clamped to 0 + if scoreHigh != 1000.0 { + t.Errorf("priority>4 should give 0 bonus, got %f", scoreHigh) + } +} + +func TestMR_Score(t *testing.T) { + now := time.Now() + convoyTime := now.Add(-12 * time.Hour) + + mr := &MR{ + Priority: 1, + CreatedAt: now.Add(-2 * time.Hour), + ConvoyCreatedAt: &convoyTime, + RetryCount: 1, + } + + score := mr.ScoreAt(now) + + // base(1000) + convoy(12*10=120) + priority(3*100=300) - retry(1*50=50) + mrAge(2*1=2) + expected := 1000.0 + 120.0 + 300.0 - 50.0 + 2.0 + if score != expected { + t.Errorf("MR.ScoreAt expected %f, got %f", expected, score) + } +} + +func TestScoreMR_EdgeCases(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + input ScoreInput + }{ + { + name: "zero time MR", + input: ScoreInput{ + Priority: 2, + MRCreatedAt: time.Time{}, + Now: now, + }, + }, + { + name: "future MR", + input: ScoreInput{ + Priority: 2, + MRCreatedAt: now.Add(24 * time.Hour), + Now: now, + }, + }, + { + name: "future convoy", + input: ScoreInput{ + Priority: 2, + MRCreatedAt: now, + ConvoyCreatedAt: func() *time.Time { t := now.Add(24 * time.Hour); return &t }(), + Now: now, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Should not panic + score := ScoreMRWithDefaults(tt.input) + // Score should still be reasonable (>= base - maxPenalty) + if score < 700 { + t.Errorf("score %f unexpectedly low for edge case", score) + } + }) + } +}