Files
gastown/internal/mrqueue/mrqueue.go
furiosa 17fd366888 feat(mrqueue): Add MQ priority objective function (gt-si8rq.1)
Implement ScoreMR function for merge queue priority ordering with:
- Convoy age factor (prevents starvation of old convoys)
- Priority factor (P0 beats P4)
- Retry penalty (prevents thrashing on conflict-prone MRs)
- MR age tiebreaker (FIFO within same priority)

Added fields to MR struct:
- RetryCount for conflict retry tracking
- ConvoyID and ConvoyCreatedAt for convoy linkage

Includes comprehensive unit tests and documentation.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 01:28:27 -08:00

357 lines
9.2 KiB
Go

// Package mrqueue provides merge request queue storage.
// MRs are stored locally in .beads/mq/ and deleted after merge.
// This avoids sync overhead for transient MR state.
package mrqueue
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
// MR represents a merge request in the queue.
type MR struct {
ID string `json:"id"`
Branch string `json:"branch"` // Source branch (e.g., "polecat/nux")
Target string `json:"target"` // Target branch (e.g., "main")
SourceIssue string `json:"source_issue"` // The work item being merged
Worker string `json:"worker"` // Who did the work
Rig string `json:"rig"` // Which rig
Title string `json:"title"` // MR title
Priority int `json:"priority"` // Priority (lower = higher priority)
CreatedAt time.Time `json:"created_at"`
AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability)
// Priority scoring fields
RetryCount int `json:"retry_count,omitempty"` // Conflict retry count for priority penalty
ConvoyID string `json:"convoy_id,omitempty"` // Parent convoy ID if part of a convoy
ConvoyCreatedAt *time.Time `json:"convoy_created_at,omitempty"` // Convoy creation time for starvation prevention
// Claiming fields for parallel refinery workers
ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR
ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed
}
// Queue manages the MR storage.
type Queue struct {
dir string // .beads/mq/ directory
}
// New creates a new MR queue for the given rig path.
func New(rigPath string) *Queue {
return &Queue{
dir: filepath.Join(rigPath, ".beads", "mq"),
}
}
// NewFromWorkdir creates a queue by finding the rig root from a working directory.
func NewFromWorkdir(workdir string) (*Queue, error) {
// Walk up to find .beads or rig root
dir := workdir
for {
beadsDir := filepath.Join(dir, ".beads")
if info, err := os.Stat(beadsDir); err == nil && info.IsDir() {
return &Queue{dir: filepath.Join(beadsDir, "mq")}, nil
}
parent := filepath.Dir(dir)
if parent == dir {
return nil, fmt.Errorf("could not find .beads directory from %s", workdir)
}
dir = parent
}
}
// EnsureDir creates the MQ directory if it doesn't exist.
func (q *Queue) EnsureDir() error {
return os.MkdirAll(q.dir, 0755)
}
// generateID creates a unique MR ID.
func generateID() string {
b := make([]byte, 4)
rand.Read(b)
return fmt.Sprintf("mr-%d-%s", time.Now().Unix(), hex.EncodeToString(b))
}
// Submit adds a new MR to the queue.
func (q *Queue) Submit(mr *MR) error {
if err := q.EnsureDir(); err != nil {
return fmt.Errorf("creating mq directory: %w", err)
}
if mr.ID == "" {
mr.ID = generateID()
}
if mr.CreatedAt.IsZero() {
mr.CreatedAt = time.Now()
}
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
path := filepath.Join(q.dir, mr.ID+".json")
if err := os.WriteFile(path, data, 0644); err != nil {
return fmt.Errorf("writing MR file: %w", err)
}
return nil
}
// List returns all pending MRs, sorted by priority then creation time.
// Deprecated: Use ListByScore for priority-aware ordering.
func (q *Queue) List() ([]*MR, error) {
entries, err := os.ReadDir(q.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil // Empty queue
}
return nil, fmt.Errorf("reading mq directory: %w", err)
}
var mrs []*MR
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
continue
}
mr, err := q.load(filepath.Join(q.dir, entry.Name()))
if err != nil {
continue // Skip malformed files
}
mrs = append(mrs, mr)
}
// Sort by priority (lower first), then by creation time (older first)
sort.Slice(mrs, func(i, j int) bool {
if mrs[i].Priority != mrs[j].Priority {
return mrs[i].Priority < mrs[j].Priority
}
return mrs[i].CreatedAt.Before(mrs[j].CreatedAt)
})
return mrs, nil
}
// ListByScore returns all pending MRs sorted by priority score (highest first).
// Uses the ScoreMR function which considers:
// - Convoy age (prevents starvation)
// - Issue priority (P0-P4)
// - Retry count (prevents thrashing)
// - MR age (FIFO tiebreaker)
func (q *Queue) ListByScore() ([]*MR, error) {
entries, err := os.ReadDir(q.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil // Empty queue
}
return nil, fmt.Errorf("reading mq directory: %w", err)
}
now := time.Now()
var mrs []*MR
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
continue
}
mr, err := q.load(filepath.Join(q.dir, entry.Name()))
if err != nil {
continue // Skip malformed files
}
mrs = append(mrs, mr)
}
// Sort by score (higher first = higher priority)
sort.Slice(mrs, func(i, j int) bool {
return mrs[i].ScoreAt(now) > mrs[j].ScoreAt(now)
})
return mrs, nil
}
// Get retrieves a specific MR by ID.
func (q *Queue) Get(id string) (*MR, error) {
path := filepath.Join(q.dir, id+".json")
return q.load(path)
}
// load reads an MR from a file path.
func (q *Queue) load(path string) (*MR, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var mr MR
if err := json.Unmarshal(data, &mr); err != nil {
return nil, err
}
return &mr, nil
}
// Remove deletes an MR from the queue (after successful merge).
func (q *Queue) Remove(id string) error {
path := filepath.Join(q.dir, id+".json")
err := os.Remove(path)
if os.IsNotExist(err) {
return nil // Already removed
}
return err
}
// Count returns the number of pending MRs.
func (q *Queue) Count() int {
entries, err := os.ReadDir(q.dir)
if err != nil {
return 0
}
count := 0
for _, entry := range entries {
if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".json") {
count++
}
}
return count
}
// Dir returns the queue directory path.
func (q *Queue) Dir() string {
return q.dir
}
// ClaimStaleTimeout is how long before a claimed MR is considered stale.
// If a worker claims an MR but doesn't process it within this time,
// another worker can reclaim it.
const ClaimStaleTimeout = 10 * time.Minute
// Claim attempts to claim an MR for processing by a specific worker.
// Returns nil if successful, ErrAlreadyClaimed if another worker has it,
// or ErrNotFound if the MR doesn't exist.
// Uses atomic file operations to prevent race conditions.
func (q *Queue) Claim(id, workerID string) error {
path := filepath.Join(q.dir, id+".json")
// Read current state
mr, err := q.load(path)
if err != nil {
if os.IsNotExist(err) {
return ErrNotFound
}
return fmt.Errorf("loading MR: %w", err)
}
// Check if already claimed by another worker
if mr.ClaimedBy != "" && mr.ClaimedBy != workerID {
// Check if claim is stale (worker may have crashed)
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout {
return ErrAlreadyClaimed
}
// Stale claim - allow reclaim
}
// Claim the MR
now := time.Now()
mr.ClaimedBy = workerID
mr.ClaimedAt = &now
// Write atomically
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
// Write to temp file first, then rename (atomic on most filesystems)
tmpPath := path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
return fmt.Errorf("writing temp file: %w", err)
}
if err := os.Rename(tmpPath, path); err != nil {
os.Remove(tmpPath) // cleanup
return fmt.Errorf("renaming temp file: %w", err)
}
return nil
}
// Release releases a claimed MR back to the queue.
// Called when processing fails and the MR should be retried.
func (q *Queue) Release(id string) error {
path := filepath.Join(q.dir, id+".json")
mr, err := q.load(path)
if err != nil {
if os.IsNotExist(err) {
return nil // Already removed
}
return fmt.Errorf("loading MR: %w", err)
}
// Clear claim
mr.ClaimedBy = ""
mr.ClaimedAt = nil
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
return os.WriteFile(path, data, 0644)
}
// ListUnclaimed returns MRs that are not claimed or have stale claims.
// Sorted by priority then creation time.
func (q *Queue) ListUnclaimed() ([]*MR, error) {
all, err := q.List()
if err != nil {
return nil, err
}
var unclaimed []*MR
for _, mr := range all {
if mr.ClaimedBy == "" {
unclaimed = append(unclaimed, mr)
continue
}
// Check if claim is stale
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) >= ClaimStaleTimeout {
unclaimed = append(unclaimed, mr)
}
}
return unclaimed, nil
}
// ListClaimedBy returns MRs claimed by a specific worker.
func (q *Queue) ListClaimedBy(workerID string) ([]*MR, error) {
all, err := q.List()
if err != nil {
return nil, err
}
var claimed []*MR
for _, mr := range all {
if mr.ClaimedBy == workerID {
claimed = append(claimed, mr)
}
}
return claimed, nil
}
// Common errors for claiming
var (
ErrNotFound = fmt.Errorf("merge request not found")
ErrAlreadyClaimed = fmt.Errorf("merge request already claimed by another worker")
)