refactor(mrqueue): remove mrqueue package, use beads for MRs (gt-dqi)

Remove the mrqueue side-channel from gastown. The merge queue now uses
beads merge-request wisps exclusively, not parallel .beads/mq/*.json files.

Changes:
- Delete internal/mrqueue/ package (~830 lines removed)
- Move scoring logic to internal/refinery/score.go
- Update Refinery engineer to query beads via ReadyWithType("merge-request")
- Add MRInfo struct to replace mrqueue.MR
- Add ClaimMR/ReleaseMR methods using beads assignee field
- Update HandleMergeReady to not create duplicate queue entries
- Update gt refinery commands (claim, release, unclaimed) to use beads
- Stub out MQEventSource (no longer needed)

The Refinery now:
- Lists MRs via beads.ReadyWithType("merge-request")
- Claims via beads.Update(..., {Assignee: worker})
- Closes via beads.CloseWithReason("merged", mrID)
- Blocks on conflicts via beads.AddDependency(mrID, taskID)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
george
2026-01-12 23:48:56 -08:00
committed by Steve Yegge
parent f0192c8b3d
commit 58207a00ec
12 changed files with 312 additions and 1556 deletions

View File

@@ -10,7 +10,7 @@ import (
"github.com/spf13/cobra"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/mrqueue"
"github.com/steveyegge/gastown/internal/refinery"
"github.com/steveyegge/gastown/internal/style"
)
@@ -260,7 +260,7 @@ func outputJSON(data interface{}) error {
return enc.Encode(data)
}
// calculateMRScore computes the priority score for an MR using the mrqueue scoring function.
// calculateMRScore computes the priority score for an MR using the refinery scoring function.
// Higher scores mean higher priority (process first).
func calculateMRScore(issue *beads.Issue, fields *beads.MRFields, now time.Time) float64 {
// Parse MR creation time
@@ -273,7 +273,7 @@ func calculateMRScore(issue *beads.Issue, fields *beads.MRFields, now time.Time)
}
// Build score input
input := mrqueue.ScoreInput{
input := refinery.ScoreInput{
Priority: issue.Priority,
MRCreatedAt: mrCreatedAt,
Now: now,
@@ -291,5 +291,5 @@ func calculateMRScore(issue *beads.Issue, fields *beads.MRFields, now time.Time)
}
}
return mrqueue.ScoreMRWithDefaults(input)
return refinery.ScoreMRWithDefaults(input)
}

View File

@@ -6,7 +6,7 @@ import (
"os"
"github.com/spf13/cobra"
"github.com/steveyegge/gastown/internal/mrqueue"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/refinery"
"github.com/steveyegge/gastown/internal/rig"
"github.com/steveyegge/gastown/internal/style"
@@ -540,19 +540,23 @@ func runRefineryClaim(cmd *cobra.Command, args []string) error {
mrID := args[0]
workerID := getWorkerID()
// Find the queue from current working directory
q, err := mrqueue.NewFromWorkdir(".")
// Find beads from current working directory
townRoot, err := workspace.FindFromCwdOrError()
if err != nil {
return fmt.Errorf("finding merge queue: %w", err)
return fmt.Errorf("not in a Gas Town workspace: %w", err)
}
rigName, err := inferRigFromCwd(townRoot)
if err != nil {
return fmt.Errorf("could not determine rig: %w", err)
}
if err := q.Claim(mrID, workerID); err != nil {
if err == mrqueue.ErrNotFound {
return fmt.Errorf("MR %s not found in queue", mrID)
}
if err == mrqueue.ErrAlreadyClaimed {
return fmt.Errorf("MR %s is already claimed by another worker", mrID)
}
_, r, err := getRig(rigName)
if err != nil {
return err
}
eng := refinery.NewEngineer(r)
if err := eng.ClaimMR(mrID, workerID); err != nil {
return fmt.Errorf("claiming MR: %w", err)
}
@@ -563,12 +567,23 @@ func runRefineryClaim(cmd *cobra.Command, args []string) error {
func runRefineryRelease(cmd *cobra.Command, args []string) error {
mrID := args[0]
q, err := mrqueue.NewFromWorkdir(".")
// Find beads from current working directory
townRoot, err := workspace.FindFromCwdOrError()
if err != nil {
return fmt.Errorf("finding merge queue: %w", err)
return fmt.Errorf("not in a Gas Town workspace: %w", err)
}
rigName, err := inferRigFromCwd(townRoot)
if err != nil {
return fmt.Errorf("could not determine rig: %w", err)
}
if err := q.Release(mrID); err != nil {
_, r, err := getRig(rigName)
if err != nil {
return err
}
eng := refinery.NewEngineer(r)
if err := eng.ReleaseMR(mrID); err != nil {
return fmt.Errorf("releasing MR: %w", err)
}
@@ -587,10 +602,35 @@ func runRefineryUnclaimed(cmd *cobra.Command, args []string) error {
return err
}
q := mrqueue.New(r.Path)
unclaimed, err := q.ListUnclaimed()
// Query beads for merge-request issues without assignee
b := beads.New(r.Path)
issues, err := b.List(beads.ListOptions{
Status: "open",
Label: "gt:merge-request",
Priority: -1,
})
if err != nil {
return fmt.Errorf("listing unclaimed MRs: %w", err)
return fmt.Errorf("listing merge requests: %w", err)
}
// Filter for unclaimed (no assignee)
var unclaimed []*refinery.MRInfo
for _, issue := range issues {
if issue.Assignee != "" {
continue
}
fields := beads.ParseMRFields(issue)
if fields == nil {
continue
}
mr := &refinery.MRInfo{
ID: issue.ID,
Branch: fields.Branch,
Target: fields.Target,
Worker: fields.Worker,
Priority: issue.Priority,
}
unclaimed = append(unclaimed, mr)
}
// JSON output

View File

@@ -1,152 +0,0 @@
// Package mrqueue provides merge request queue storage and events.
package mrqueue
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// EventType represents the type of MQ lifecycle event.
type EventType string
const (
// EventMergeStarted indicates refinery began processing an MR.
EventMergeStarted EventType = "merge_started"
// EventMerged indicates an MR was successfully merged.
EventMerged EventType = "merged"
// EventMergeFailed indicates a merge failed (conflict, tests, etc.).
EventMergeFailed EventType = "merge_failed"
// EventMergeSkipped indicates an MR was skipped (already merged, etc.).
EventMergeSkipped EventType = "merge_skipped"
)
// Event represents a single MQ lifecycle event.
type Event struct {
Timestamp time.Time `json:"timestamp"`
Type EventType `json:"type"`
MRID string `json:"mr_id"`
Branch string `json:"branch"`
Target string `json:"target"`
Worker string `json:"worker,omitempty"`
SourceIssue string `json:"source_issue,omitempty"`
Rig string `json:"rig,omitempty"`
MergeCommit string `json:"merge_commit,omitempty"` // For merged events
Reason string `json:"reason,omitempty"` // For failed/skipped events
}
// EventLogger handles writing MQ events to the event log.
type EventLogger struct {
logPath string
mu sync.Mutex
}
// NewEventLogger creates a new EventLogger for the given beads directory.
func NewEventLogger(beadsDir string) *EventLogger {
return &EventLogger{
logPath: filepath.Join(beadsDir, "mq_events.jsonl"),
}
}
// NewEventLoggerFromRig creates an EventLogger for the given rig path.
func NewEventLoggerFromRig(rigPath string) *EventLogger {
return NewEventLogger(filepath.Join(rigPath, ".beads"))
}
// LogEvent writes an event to the MQ event log.
func (l *EventLogger) LogEvent(event Event) error {
l.mu.Lock()
defer l.mu.Unlock()
// Ensure timestamp is set
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
// Ensure log directory exists
if err := os.MkdirAll(filepath.Dir(l.logPath), 0755); err != nil {
return fmt.Errorf("creating log directory: %w", err)
}
// Marshal event to JSON
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshaling event: %w", err)
}
// Append to log file
f, err := os.OpenFile(l.logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return fmt.Errorf("opening event log: %w", err)
}
defer f.Close()
if _, err := f.Write(append(data, '\n')); err != nil {
return fmt.Errorf("writing event: %w", err)
}
return nil
}
// LogMergeStarted logs a merge_started event.
func (l *EventLogger) LogMergeStarted(mr *MR) error {
return l.LogEvent(Event{
Type: EventMergeStarted,
MRID: mr.ID,
Branch: mr.Branch,
Target: mr.Target,
Worker: mr.Worker,
SourceIssue: mr.SourceIssue,
Rig: mr.Rig,
})
}
// LogMerged logs a merged event.
func (l *EventLogger) LogMerged(mr *MR, mergeCommit string) error {
return l.LogEvent(Event{
Type: EventMerged,
MRID: mr.ID,
Branch: mr.Branch,
Target: mr.Target,
Worker: mr.Worker,
SourceIssue: mr.SourceIssue,
Rig: mr.Rig,
MergeCommit: mergeCommit,
})
}
// LogMergeFailed logs a merge_failed event.
func (l *EventLogger) LogMergeFailed(mr *MR, reason string) error {
return l.LogEvent(Event{
Type: EventMergeFailed,
MRID: mr.ID,
Branch: mr.Branch,
Target: mr.Target,
Worker: mr.Worker,
SourceIssue: mr.SourceIssue,
Rig: mr.Rig,
Reason: reason,
})
}
// LogMergeSkipped logs a merge_skipped event.
func (l *EventLogger) LogMergeSkipped(mr *MR, reason string) error {
return l.LogEvent(Event{
Type: EventMergeSkipped,
MRID: mr.ID,
Branch: mr.Branch,
Target: mr.Target,
Worker: mr.Worker,
SourceIssue: mr.SourceIssue,
Rig: mr.Rig,
Reason: reason,
})
}
// LogPath returns the path to the event log file.
func (l *EventLogger) LogPath() string {
return l.logPath
}

View File

@@ -1,114 +0,0 @@
package mrqueue
import (
"encoding/json"
"os"
"path/filepath"
"testing"
"time"
)
func TestEventLogger(t *testing.T) {
// Create temp directory
tmpDir, err := os.MkdirTemp("", "mrqueue-test")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
beadsDir := filepath.Join(tmpDir, ".beads")
if err := os.MkdirAll(beadsDir, 0755); err != nil {
t.Fatalf("Failed to create beads dir: %v", err)
}
logger := NewEventLogger(beadsDir)
// Test MR
mr := &MR{
ID: "mr-test-123",
Branch: "polecat/test",
Target: "main",
SourceIssue: "gt-abc",
Worker: "test-worker",
Rig: "test-rig",
}
// Log merge_started
if err := logger.LogMergeStarted(mr); err != nil {
t.Errorf("LogMergeStarted failed: %v", err)
}
// Log merged
if err := logger.LogMerged(mr, "abc123def456"); err != nil {
t.Errorf("LogMerged failed: %v", err)
}
// Log merge_failed
if err := logger.LogMergeFailed(mr, "conflict in file.go"); err != nil {
t.Errorf("LogMergeFailed failed: %v", err)
}
// Log merge_skipped
if err := logger.LogMergeSkipped(mr, "already merged"); err != nil {
t.Errorf("LogMergeSkipped failed: %v", err)
}
// Read and verify events
logPath := logger.LogPath()
data, err := os.ReadFile(logPath)
if err != nil {
t.Fatalf("Failed to read log file: %v", err)
}
lines := splitLines(string(data))
if len(lines) != 4 {
t.Errorf("Expected 4 events, got %d", len(lines))
}
// Verify each event type
expectedTypes := []EventType{EventMergeStarted, EventMerged, EventMergeFailed, EventMergeSkipped}
for i, line := range lines {
if line == "" {
continue
}
var event Event
if err := json.Unmarshal([]byte(line), &event); err != nil {
t.Errorf("Failed to parse event %d: %v", i, err)
continue
}
if event.Type != expectedTypes[i] {
t.Errorf("Event %d: expected type %s, got %s", i, expectedTypes[i], event.Type)
}
if event.MRID != mr.ID {
t.Errorf("Event %d: expected MR ID %s, got %s", i, mr.ID, event.MRID)
}
if event.Branch != mr.Branch {
t.Errorf("Event %d: expected branch %s, got %s", i, mr.Branch, event.Branch)
}
// Check timestamp is recent
if time.Since(event.Timestamp) > time.Minute {
t.Errorf("Event %d: timestamp too old: %v", i, event.Timestamp)
}
}
}
func splitLines(s string) []string {
var lines []string
start := 0
for i := 0; i < len(s); i++ {
if s[i] == '\n' {
if start < i {
lines = append(lines, s[start:i])
}
start = i + 1
}
}
if start < len(s) {
lines = append(lines, s[start:])
}
return lines
}

View File

@@ -1,469 +0,0 @@
// Package mrqueue provides merge request queue storage.
// MRs are stored locally in .beads/mq/ and deleted after merge.
// This avoids sync overhead for transient MR state.
package mrqueue
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
)
// MR represents a merge request in the queue.
type MR struct {
ID string `json:"id"`
Branch string `json:"branch"` // Source branch (e.g., "polecat/nux")
Target string `json:"target"` // Target branch (e.g., "main")
SourceIssue string `json:"source_issue"` // The work item being merged
Worker string `json:"worker"` // Who did the work
Rig string `json:"rig"` // Which rig
Title string `json:"title"` // MR title
Priority int `json:"priority"` // Priority (lower = higher priority)
CreatedAt time.Time `json:"created_at"`
AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability)
// Priority scoring fields
RetryCount int `json:"retry_count,omitempty"` // Conflict retry count for priority penalty
ConvoyID string `json:"convoy_id,omitempty"` // Parent convoy ID if part of a convoy
ConvoyCreatedAt *time.Time `json:"convoy_created_at,omitempty"` // Convoy creation time for starvation prevention
// Claiming fields for parallel refinery workers
ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR
ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed
// Blocking fields for non-blocking delegation
BlockedBy string `json:"blocked_by,omitempty"` // Task ID that blocks this MR (e.g., conflict resolution task)
}
// Queue manages the MR storage.
type Queue struct {
dir string // .beads/mq/ directory
}
// New creates a new MR queue for the given rig path.
func New(rigPath string) *Queue {
return &Queue{
dir: filepath.Join(rigPath, ".beads", "mq"),
}
}
// NewFromWorkdir creates a queue by finding the rig root from a working directory.
func NewFromWorkdir(workdir string) (*Queue, error) {
// Walk up to find .beads or rig root
dir := workdir
for {
beadsDir := filepath.Join(dir, ".beads")
if info, err := os.Stat(beadsDir); err == nil && info.IsDir() {
return &Queue{dir: filepath.Join(beadsDir, "mq")}, nil
}
parent := filepath.Dir(dir)
if parent == dir {
return nil, fmt.Errorf("could not find .beads directory from %s", workdir)
}
dir = parent
}
}
// EnsureDir creates the MQ directory if it doesn't exist.
func (q *Queue) EnsureDir() error {
return os.MkdirAll(q.dir, 0755)
}
// generateID creates a unique MR ID.
func generateID() string {
b := make([]byte, 4)
_, _ = rand.Read(b)
return fmt.Sprintf("mr-%d-%s", time.Now().Unix(), hex.EncodeToString(b))
}
// Submit adds a new MR to the queue.
func (q *Queue) Submit(mr *MR) error {
if err := q.EnsureDir(); err != nil {
return fmt.Errorf("creating mq directory: %w", err)
}
if mr.ID == "" {
mr.ID = generateID()
}
if mr.CreatedAt.IsZero() {
mr.CreatedAt = time.Now()
}
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
path := filepath.Join(q.dir, mr.ID+".json")
if err := os.WriteFile(path, data, 0644); err != nil {
return fmt.Errorf("writing MR file: %w", err)
}
return nil
}
// List returns all pending MRs, sorted by priority then creation time.
// Deprecated: Use ListByScore for priority-aware ordering.
func (q *Queue) List() ([]*MR, error) {
entries, err := os.ReadDir(q.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil // Empty queue
}
return nil, fmt.Errorf("reading mq directory: %w", err)
}
var mrs []*MR
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
continue
}
mr, err := q.load(filepath.Join(q.dir, entry.Name()))
if err != nil {
continue // Skip malformed files
}
mrs = append(mrs, mr)
}
// Sort by priority (lower first), then by creation time (older first)
sort.Slice(mrs, func(i, j int) bool {
if mrs[i].Priority != mrs[j].Priority {
return mrs[i].Priority < mrs[j].Priority
}
return mrs[i].CreatedAt.Before(mrs[j].CreatedAt)
})
return mrs, nil
}
// ListByScore returns all pending MRs sorted by priority score (highest first).
// Uses the ScoreMR function which considers:
// - Convoy age (prevents starvation)
// - Issue priority (P0-P4)
// - Retry count (prevents thrashing)
// - MR age (FIFO tiebreaker)
func (q *Queue) ListByScore() ([]*MR, error) {
entries, err := os.ReadDir(q.dir)
if err != nil {
if os.IsNotExist(err) {
return nil, nil // Empty queue
}
return nil, fmt.Errorf("reading mq directory: %w", err)
}
now := time.Now()
var mrs []*MR
for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
continue
}
mr, err := q.load(filepath.Join(q.dir, entry.Name()))
if err != nil {
continue // Skip malformed files
}
mrs = append(mrs, mr)
}
// Sort by score (higher first = higher priority)
sort.Slice(mrs, func(i, j int) bool {
return mrs[i].ScoreAt(now) > mrs[j].ScoreAt(now)
})
return mrs, nil
}
// Get retrieves a specific MR by ID.
func (q *Queue) Get(id string) (*MR, error) {
path := filepath.Join(q.dir, id+".json")
return q.load(path)
}
// load reads an MR from a file path.
func (q *Queue) load(path string) (*MR, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var mr MR
if err := json.Unmarshal(data, &mr); err != nil {
return nil, err
}
return &mr, nil
}
// Remove deletes an MR from the queue (after successful merge).
func (q *Queue) Remove(id string) error {
path := filepath.Join(q.dir, id+".json")
err := os.Remove(path)
if os.IsNotExist(err) {
return nil // Already removed
}
return err
}
// Count returns the number of pending MRs.
func (q *Queue) Count() int {
entries, err := os.ReadDir(q.dir)
if err != nil {
return 0
}
count := 0
for _, entry := range entries {
if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".json") {
count++
}
}
return count
}
// Dir returns the queue directory path.
func (q *Queue) Dir() string {
return q.dir
}
// ClaimStaleTimeout is how long before a claimed MR is considered stale.
// If a worker claims an MR but doesn't process it within this time,
// another worker can reclaim it.
const ClaimStaleTimeout = 10 * time.Minute
// Claim attempts to claim an MR for processing by a specific worker.
// Returns nil if successful, ErrAlreadyClaimed if another worker has it,
// or ErrNotFound if the MR doesn't exist.
// Uses atomic file operations to prevent race conditions.
func (q *Queue) Claim(id, workerID string) error {
path := filepath.Join(q.dir, id+".json")
// Read current state
mr, err := q.load(path)
if err != nil {
if os.IsNotExist(err) {
return ErrNotFound
}
return fmt.Errorf("loading MR: %w", err)
}
// Check if already claimed by another worker
if mr.ClaimedBy != "" && mr.ClaimedBy != workerID {
// Check if claim is stale (worker may have crashed)
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout {
return ErrAlreadyClaimed
}
// Stale claim - allow reclaim
}
// Claim the MR
now := time.Now()
mr.ClaimedBy = workerID
mr.ClaimedAt = &now
// Write atomically
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
// Write to temp file first, then rename (atomic on most filesystems)
tmpPath := path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0644); err != nil {
return fmt.Errorf("writing temp file: %w", err)
}
if err := os.Rename(tmpPath, path); err != nil {
_ = os.Remove(tmpPath) // cleanup
return fmt.Errorf("renaming temp file: %w", err)
}
return nil
}
// Release releases a claimed MR back to the queue.
// Called when processing fails and the MR should be retried.
func (q *Queue) Release(id string) error {
path := filepath.Join(q.dir, id+".json")
mr, err := q.load(path)
if err != nil {
if os.IsNotExist(err) {
return nil // Already removed
}
return fmt.Errorf("loading MR: %w", err)
}
// Clear claim
mr.ClaimedBy = ""
mr.ClaimedAt = nil
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
return os.WriteFile(path, data, 0644)
}
// ListUnclaimed returns MRs that are not claimed or have stale claims.
// Sorted by priority then creation time.
func (q *Queue) ListUnclaimed() ([]*MR, error) {
all, err := q.List()
if err != nil {
return nil, err
}
var unclaimed []*MR
for _, mr := range all {
if mr.ClaimedBy == "" {
unclaimed = append(unclaimed, mr)
continue
}
// Check if claim is stale
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) >= ClaimStaleTimeout {
unclaimed = append(unclaimed, mr)
}
}
return unclaimed, nil
}
// ListClaimedBy returns MRs claimed by a specific worker.
func (q *Queue) ListClaimedBy(workerID string) ([]*MR, error) {
all, err := q.List()
if err != nil {
return nil, err
}
var claimed []*MR
for _, mr := range all {
if mr.ClaimedBy == workerID {
claimed = append(claimed, mr)
}
}
return claimed, nil
}
// Common errors for claiming
var (
ErrNotFound = fmt.Errorf("merge request not found")
ErrAlreadyClaimed = fmt.Errorf("merge request already claimed by another worker")
)
// SetBlockedBy marks an MR as blocked by a task (e.g., conflict resolution).
// When the blocking task closes, the MR becomes ready for processing again.
func (q *Queue) SetBlockedBy(mrID, taskID string) error {
path := filepath.Join(q.dir, mrID+".json")
mr, err := q.load(path)
if err != nil {
if os.IsNotExist(err) {
return ErrNotFound
}
return fmt.Errorf("loading MR: %w", err)
}
mr.BlockedBy = taskID
data, err := json.MarshalIndent(mr, "", " ")
if err != nil {
return fmt.Errorf("marshaling MR: %w", err)
}
return os.WriteFile(path, data, 0644)
}
// ClearBlockedBy removes the blocking task from an MR.
func (q *Queue) ClearBlockedBy(mrID string) error {
return q.SetBlockedBy(mrID, "")
}
// IsBlocked checks if an MR is blocked by a task that is still open.
// If blocked, returns true and the blocking task ID.
// checkStatus is a function that checks if a bead is still open.
func (mr *MR) IsBlocked(checkStatus func(beadID string) (isOpen bool, err error)) (bool, string, error) {
if mr.BlockedBy == "" {
return false, "", nil
}
isOpen, err := checkStatus(mr.BlockedBy)
if err != nil {
// If we can't check status, assume not blocked (fail open)
return false, "", nil
}
return isOpen, mr.BlockedBy, nil
}
// BeadStatusChecker is a function type that checks if a bead is open.
// Returns true if the bead is open (not closed), false if closed or not found.
type BeadStatusChecker func(beadID string) (isOpen bool, err error)
// ListReady returns MRs that are ready for processing:
// - Not claimed by another worker (or claim is stale)
// - Not blocked by an open task
// Sorted by priority score (highest first).
// The checkStatus function is used to check if blocking tasks are still open.
func (q *Queue) ListReady(checkStatus BeadStatusChecker) ([]*MR, error) {
all, err := q.ListByScore()
if err != nil {
return nil, err
}
var ready []*MR
for _, mr := range all {
// Skip if claimed by another worker (and not stale)
if mr.ClaimedBy != "" {
if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout {
continue
}
// Stale claim - include in ready list
}
// Skip if blocked by an open task
if mr.BlockedBy != "" && checkStatus != nil {
isOpen, err := checkStatus(mr.BlockedBy)
if err == nil && isOpen {
// Blocked by an open task - skip
continue
}
// If error or task closed, proceed (fail open)
}
ready = append(ready, mr)
}
return ready, nil
}
// ListBlocked returns MRs that are blocked by open tasks.
// Useful for reporting/monitoring.
func (q *Queue) ListBlocked(checkStatus BeadStatusChecker) ([]*MR, error) {
all, err := q.List()
if err != nil {
return nil, err
}
var blocked []*MR
for _, mr := range all {
if mr.BlockedBy == "" {
continue
}
if checkStatus != nil {
isOpen, err := checkStatus(mr.BlockedBy)
if err == nil && isOpen {
blocked = append(blocked, mr)
}
}
}
return blocked, nil
}

View File

@@ -1,334 +0,0 @@
package mrqueue
import (
"testing"
"time"
)
func TestScoreMR_BaseScore(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
input := ScoreInput{
Priority: 2, // P2 (medium)
MRCreatedAt: now,
RetryCount: 0,
Now: now,
}
score := ScoreMR(input, config)
// BaseScore(1000) + Priority(2 gives 4-2=2, so 2*100=200) = 1200
expected := 1200.0
if score != expected {
t.Errorf("expected score %f, got %f", expected, score)
}
}
func TestScoreMR_PriorityOrdering(t *testing.T) {
now := time.Now()
tests := []struct {
priority int
expected float64
}{
{0, 1400.0}, // P0: base(1000) + (4-0)*100 = 1400
{1, 1300.0}, // P1: base(1000) + (4-1)*100 = 1300
{2, 1200.0}, // P2: base(1000) + (4-2)*100 = 1200
{3, 1100.0}, // P3: base(1000) + (4-3)*100 = 1100
{4, 1000.0}, // P4: base(1000) + (4-4)*100 = 1000
}
for _, tt := range tests {
t.Run("P"+string(rune('0'+tt.priority)), func(t *testing.T) {
input := ScoreInput{
Priority: tt.priority,
MRCreatedAt: now,
Now: now,
}
score := ScoreMRWithDefaults(input)
if score != tt.expected {
t.Errorf("P%d: expected %f, got %f", tt.priority, tt.expected, score)
}
})
}
// Verify ordering: P0 > P1 > P2 > P3 > P4
for i := 0; i < 4; i++ {
input1 := ScoreInput{Priority: i, MRCreatedAt: now, Now: now}
input2 := ScoreInput{Priority: i + 1, MRCreatedAt: now, Now: now}
score1 := ScoreMRWithDefaults(input1)
score2 := ScoreMRWithDefaults(input2)
if score1 <= score2 {
t.Errorf("P%d (%f) should score higher than P%d (%f)", i, score1, i+1, score2)
}
}
}
func TestScoreMR_ConvoyAgeEscalation(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
// MR without convoy
noConvoy := ScoreInput{
Priority: 2,
MRCreatedAt: now,
Now: now,
}
scoreNoConvoy := ScoreMR(noConvoy, config)
// MR with 24-hour old convoy
convoyTime := now.Add(-24 * time.Hour)
withConvoy := ScoreInput{
Priority: 2,
MRCreatedAt: now,
ConvoyCreatedAt: &convoyTime,
Now: now,
}
scoreWithConvoy := ScoreMR(withConvoy, config)
// 24 hours * 10 pts/hour = 240 extra points
expectedDiff := 240.0
actualDiff := scoreWithConvoy - scoreNoConvoy
if actualDiff != expectedDiff {
t.Errorf("expected convoy age to add %f pts, got %f", expectedDiff, actualDiff)
}
}
func TestScoreMR_ConvoyStarvationPrevention(t *testing.T) {
now := time.Now()
// P4 issue in 48-hour old convoy vs P0 issue with no convoy
oldConvoy := now.Add(-48 * time.Hour)
lowPriorityOldConvoy := ScoreInput{
Priority: 4, // P4 (lowest)
MRCreatedAt: now,
ConvoyCreatedAt: &oldConvoy,
Now: now,
}
highPriorityNoConvoy := ScoreInput{
Priority: 0, // P0 (highest)
MRCreatedAt: now,
Now: now,
}
scoreOldConvoy := ScoreMRWithDefaults(lowPriorityOldConvoy)
scoreHighPriority := ScoreMRWithDefaults(highPriorityNoConvoy)
// P4 with 48h convoy: 1000 + 0 + 480 = 1480
// P0 with no convoy: 1000 + 400 + 0 = 1400
// Old convoy should win (starvation prevention)
if scoreOldConvoy <= scoreHighPriority {
t.Errorf("48h old P4 convoy (%f) should beat P0 no convoy (%f) for starvation prevention",
scoreOldConvoy, scoreHighPriority)
}
}
func TestScoreMR_RetryPenalty(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
// No retries
noRetry := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 0,
Now: now,
}
scoreNoRetry := ScoreMR(noRetry, config)
// 3 retries
threeRetries := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 3,
Now: now,
}
scoreThreeRetries := ScoreMR(threeRetries, config)
// 3 retries * 50 pts penalty = 150 pts less
expectedDiff := 150.0
actualDiff := scoreNoRetry - scoreThreeRetries
if actualDiff != expectedDiff {
t.Errorf("expected 3 retries to lose %f pts, lost %f", expectedDiff, actualDiff)
}
}
func TestScoreMR_RetryPenaltyCapped(t *testing.T) {
now := time.Now()
config := DefaultScoreConfig()
// Max penalty is 300, so 10 retries should be same as 6
sixRetries := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 6,
Now: now,
}
tenRetries := ScoreInput{
Priority: 2,
MRCreatedAt: now,
RetryCount: 10,
Now: now,
}
scoreSix := ScoreMR(sixRetries, config)
scoreTen := ScoreMR(tenRetries, config)
if scoreSix != scoreTen {
t.Errorf("penalty should be capped: 6 retries (%f) should equal 10 retries (%f)",
scoreSix, scoreTen)
}
// Both should be base(1000) + priority(200) - maxPenalty(300) = 900
expected := 900.0
if scoreSix != expected {
t.Errorf("expected capped score %f, got %f", expected, scoreSix)
}
}
func TestScoreMR_MRAgeAsTiebreaker(t *testing.T) {
now := time.Now()
// Two MRs with same priority, one submitted 10 hours ago
oldMR := ScoreInput{
Priority: 2,
MRCreatedAt: now.Add(-10 * time.Hour),
Now: now,
}
newMR := ScoreInput{
Priority: 2,
MRCreatedAt: now,
Now: now,
}
scoreOld := ScoreMRWithDefaults(oldMR)
scoreNew := ScoreMRWithDefaults(newMR)
// Old MR should have 10 pts more (1 pt/hour)
expectedDiff := 10.0
actualDiff := scoreOld - scoreNew
if actualDiff != expectedDiff {
t.Errorf("older MR should score %f more, got %f", expectedDiff, actualDiff)
}
}
func TestScoreMR_Deterministic(t *testing.T) {
fixedNow := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
convoyTime := time.Date(2024, 12, 31, 12, 0, 0, 0, time.UTC)
mrTime := time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC)
input := ScoreInput{
Priority: 1,
MRCreatedAt: mrTime,
ConvoyCreatedAt: &convoyTime,
RetryCount: 2,
Now: fixedNow,
}
// Run 100 times, should always be same
first := ScoreMRWithDefaults(input)
for i := 0; i < 100; i++ {
score := ScoreMRWithDefaults(input)
if score != first {
t.Errorf("score not deterministic: iteration %d got %f, expected %f", i, score, first)
}
}
}
func TestScoreMR_InvalidPriorityClamped(t *testing.T) {
now := time.Now()
// Negative priority should clamp to 0 bonus (priority=4)
negativePriority := ScoreInput{
Priority: -1,
MRCreatedAt: now,
Now: now,
}
scoreNegative := ScoreMRWithDefaults(negativePriority)
// Very high priority should clamp to max bonus (priority=0)
highPriority := ScoreInput{
Priority: 10,
MRCreatedAt: now,
Now: now,
}
scoreHigh := ScoreMRWithDefaults(highPriority)
// Negative priority gets clamped to max bonus (4*100=400)
if scoreNegative != 1400.0 {
t.Errorf("negative priority should clamp to P0 bonus, got %f", scoreNegative)
}
// High priority (10) gives 4-10=-6, clamped to 0
if scoreHigh != 1000.0 {
t.Errorf("priority>4 should give 0 bonus, got %f", scoreHigh)
}
}
func TestMR_Score(t *testing.T) {
now := time.Now()
convoyTime := now.Add(-12 * time.Hour)
mr := &MR{
Priority: 1,
CreatedAt: now.Add(-2 * time.Hour),
ConvoyCreatedAt: &convoyTime,
RetryCount: 1,
}
score := mr.ScoreAt(now)
// base(1000) + convoy(12*10=120) + priority(3*100=300) - retry(1*50=50) + mrAge(2*1=2)
expected := 1000.0 + 120.0 + 300.0 - 50.0 + 2.0
if score != expected {
t.Errorf("MR.ScoreAt expected %f, got %f", expected, score)
}
}
func TestScoreMR_EdgeCases(t *testing.T) {
now := time.Now()
tests := []struct {
name string
input ScoreInput
}{
{
name: "zero time MR",
input: ScoreInput{
Priority: 2,
MRCreatedAt: time.Time{},
Now: now,
},
},
{
name: "future MR",
input: ScoreInput{
Priority: 2,
MRCreatedAt: now.Add(24 * time.Hour),
Now: now,
},
},
{
name: "future convoy",
input: ScoreInput{
Priority: 2,
MRCreatedAt: now,
ConvoyCreatedAt: func() *time.Time { t := now.Add(24 * time.Hour); return &t }(),
Now: now,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Should not panic
score := ScoreMRWithDefaults(tt.input)
// Score should still be reasonable (>= base - maxPenalty)
if score < 700 {
t.Errorf("score %f unexpectedly low for edge case", score)
}
})
}
}

View File

@@ -4,14 +4,13 @@ import (
"fmt"
"io"
"os"
"time"
"github.com/steveyegge/gastown/internal/mail"
"github.com/steveyegge/gastown/internal/mrqueue"
)
// DefaultRefineryHandler provides the default implementation for Refinery protocol handlers.
// It receives MERGE_READY messages from the Witness and adds work to the merge queue.
// It receives MERGE_READY messages from the Witness and acknowledges verified work.
// Note: The Refinery now queries beads directly for merge requests (via ReadyWithType).
type DefaultRefineryHandler struct {
// Rig is the name of the rig this refinery processes.
Rig string
@@ -19,9 +18,6 @@ type DefaultRefineryHandler struct {
// WorkDir is the working directory for operations.
WorkDir string
// Queue is the merge request queue.
Queue *mrqueue.Queue
// Router is used to send mail messages.
Router *mail.Router
@@ -34,7 +30,6 @@ func NewRefineryHandler(rig, workDir string) *DefaultRefineryHandler {
return &DefaultRefineryHandler{
Rig: rig,
WorkDir: workDir,
Queue: mrqueue.New(workDir),
Router: mail.NewRouter(workDir),
Output: os.Stdout,
}
@@ -46,10 +41,10 @@ func (h *DefaultRefineryHandler) SetOutput(w io.Writer) {
}
// HandleMergeReady handles a MERGE_READY message from Witness.
// When a polecat's work is verified and ready, the Refinery:
// 1. Validates the merge request
// 2. Adds it to the merge queue
// 3. Acknowledges receipt
// When a polecat's work is verified and ready, the Refinery acknowledges receipt.
//
// NOTE: The merge-request bead is created by `gt done`, so we no longer need
// to add to the mrqueue here. The Refinery queries beads directly for ready MRs.
func (h *DefaultRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) error {
_, _ = fmt.Fprintf(h.Output, "[Refinery] MERGE_READY received for polecat %s\n", payload.Polecat)
_, _ = fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch)
@@ -64,25 +59,10 @@ func (h *DefaultRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) er
return fmt.Errorf("missing polecat in MERGE_READY payload")
}
// Create merge request (ID is generated by Submit if empty)
mr := &mrqueue.MR{
Branch: payload.Branch,
Worker: payload.Polecat,
SourceIssue: payload.Issue,
Target: "main", // Default target, could be passed in payload
Rig: payload.Rig,
Title: fmt.Sprintf("Merge %s work on %s", payload.Polecat, payload.Issue),
CreatedAt: time.Now(),
}
// Add to queue
if err := h.Queue.Submit(mr); err != nil {
_, _ = fmt.Fprintf(h.Output, "[Refinery] Error adding to queue: %v\n", err)
return fmt.Errorf("failed to add merge request to queue: %w", err)
}
_, _ = fmt.Fprintf(h.Output, "[Refinery] ✓ Added to merge queue: %s\n", mr.ID)
_, _ = fmt.Fprintf(h.Output, " Queue length: %d\n", h.Queue.Count())
// The merge-request bead is created by `gt done` with gt:merge-request label.
// The Refinery queries beads directly via ReadyWithType("merge-request").
// No need to add to mrqueue - that was a duplicate tracking file.
_, _ = fmt.Fprintf(h.Output, "[Refinery] ✓ Work verified - Refinery will pick up MR via beads query\n")
return nil
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/git"
"github.com/steveyegge/gastown/internal/mail"
"github.com/steveyegge/gastown/internal/mrqueue"
"github.com/steveyegge/gastown/internal/protocol"
"github.com/steveyegge/gastown/internal/rig"
)
@@ -70,18 +69,35 @@ func DefaultMergeQueueConfig() *MergeQueueConfig {
}
}
// MRInfo holds merge request information for display and processing.
// This replaces mrqueue.MR after the mrqueue package removal.
type MRInfo struct {
ID string // Bead ID (e.g., "gt-abc123")
Branch string // Source branch (e.g., "polecat/nux")
Target string // Target branch (e.g., "main")
SourceIssue string // The work item being merged
Worker string // Who did the work
Rig string // Which rig
Title string // MR title
Priority int // Priority (lower = higher priority)
AgentBead string // Agent bead ID that created this MR
RetryCount int // Conflict retry count
ConvoyID string // Parent convoy ID if part of a convoy
ConvoyCreatedAt *time.Time // Convoy creation time
CreatedAt time.Time // MR creation time
BlockedBy string // Task ID blocking this MR
}
// Engineer is the merge queue processor that polls for ready merge-requests
// and processes them according to the merge queue design.
type Engineer struct {
rig *rig.Rig
beads *beads.Beads
mrQueue *mrqueue.Queue
git *git.Git
config *MergeQueueConfig
workDir string
output io.Writer // Output destination for user-facing messages
eventLogger *mrqueue.EventLogger
router *mail.Router // Mail router for sending protocol messages
rig *rig.Rig
beads *beads.Beads
git *git.Git
config *MergeQueueConfig
workDir string
output io.Writer // Output destination for user-facing messages
router *mail.Router // Mail router for sending protocol messages
// stopCh is used for graceful shutdown
stopCh chan struct{}
@@ -102,16 +118,14 @@ func NewEngineer(r *rig.Rig) *Engineer {
}
return &Engineer{
rig: r,
beads: beads.New(r.Path),
mrQueue: mrqueue.New(r.Path),
git: git.NewGit(gitDir),
config: cfg,
workDir: gitDir,
output: os.Stdout,
eventLogger: mrqueue.NewEventLoggerFromRig(r.Path),
router: mail.NewRouter(r.Path),
stopCh: make(chan struct{}),
rig: r,
beads: beads.New(r.Path),
git: git.NewGit(gitDir),
config: cfg,
workDir: gitDir,
output: os.Stdout,
router: mail.NewRouter(r.Path),
stopCh: make(chan struct{}),
}
}
@@ -479,31 +493,21 @@ func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) {
_, _ = fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error)
}
// ProcessMRFromQueue processes a merge request from wisp queue.
func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) ProcessResult {
// MR fields are directly on the struct (no parsing needed)
_, _ = fmt.Fprintln(e.output, "[Engineer] Processing MR from queue:")
// ProcessMRInfo processes a merge request from MRInfo.
func (e *Engineer) ProcessMRInfo(ctx context.Context, mr *MRInfo) ProcessResult {
// MR fields are directly on the struct
_, _ = fmt.Fprintln(e.output, "[Engineer] Processing MR:")
_, _ = fmt.Fprintf(e.output, " Branch: %s\n", mr.Branch)
_, _ = fmt.Fprintf(e.output, " Target: %s\n", mr.Target)
_, _ = fmt.Fprintf(e.output, " Worker: %s\n", mr.Worker)
_, _ = fmt.Fprintf(e.output, " Source: %s\n", mr.SourceIssue)
// Emit merge_started event
if err := e.eventLogger.LogMergeStarted(mr); err != nil {
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merge_started event: %v\n", err)
}
// Use the shared merge logic
return e.doMerge(ctx, mr.Branch, mr.Target, mr.SourceIssue)
}
// handleSuccessFromQueue handles a successful merge from wisp queue.
func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) {
// Emit merged event
if err := e.eventLogger.LogMerged(mr, result.MergeCommit); err != nil {
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merged event: %v\n", err)
}
// HandleMRInfoSuccess handles a successful merge from MRInfo.
func (e *Engineer) HandleMRInfoSuccess(mr *MRInfo, result ProcessResult) {
// Release merge slot if this was a conflict resolution
// The slot is held while conflict resolution is in progress
holder := e.rig.Name + "/refinery"
@@ -518,7 +522,7 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult)
_, _ = fmt.Fprintf(e.output, "[Engineer] Released merge slot\n")
}
// Update and close the MR bead (matches handleSuccess behavior)
// Update and close the MR bead
if mr.ID != "" {
// Fetch the MR bead to update its fields
mrBead, err := e.beads.Show(mr.ID)
@@ -572,24 +576,14 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult)
}
}
// 3. Remove MR from queue (ephemeral - just delete the file)
if err := e.mrQueue.Remove(mr.ID); err != nil {
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to remove MR from queue: %v\n", err)
}
// 4. Log success
// 3. Log success
_, _ = fmt.Fprintf(e.output, "[Engineer] ✓ Merged: %s (commit: %s)\n", mr.ID, result.MergeCommit)
}
// handleFailureFromQueue handles a failed merge from wisp queue.
// HandleMRInfoFailure handles a failed merge from MRInfo.
// For conflicts, creates a resolution task and blocks the MR until resolved.
// This enables non-blocking delegation: the queue continues to the next MR.
func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) {
// Emit merge_failed event
if err := e.eventLogger.LogMergeFailed(mr, result.Error); err != nil {
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merge_failed event: %v\n", err)
}
func (e *Engineer) HandleMRInfoFailure(mr *MRInfo, result ProcessResult) {
// Notify Witness of the failure so polecat can be alerted
// Determine failure type from result
failureType := "build"
@@ -608,13 +602,13 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult)
// If this was a conflict, create a conflict-resolution task for dispatch
// and block the MR until the task is resolved (non-blocking delegation)
if result.Conflict {
taskID, err := e.createConflictResolutionTask(mr, result)
taskID, err := e.createConflictResolutionTaskForMR(mr, result)
if err != nil {
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to create conflict resolution task: %v\n", err)
} else {
// Block the MR on the conflict resolution task
} else if taskID != "" {
// Block the MR on the conflict resolution task using beads dependency
// When the task closes, the MR unblocks and re-enters the ready queue
if err := e.mrQueue.SetBlockedBy(mr.ID, taskID); err != nil {
if err := e.beads.AddDependency(mr.ID, taskID); err != nil {
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to block MR on task: %v\n", err)
} else {
_, _ = fmt.Fprintf(e.output, "[Engineer] MR %s blocked on conflict task %s (non-blocking delegation)\n", mr.ID, taskID)
@@ -631,7 +625,7 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult)
}
}
// createConflictResolutionTask creates a dispatchable task for resolving merge conflicts.
// createConflictResolutionTaskForMR creates a dispatchable task for resolving merge conflicts.
// This task will be picked up by bd ready and can be slung to a fresh polecat (spawned on demand).
// Returns the created task's ID for blocking the MR until resolution.
//
@@ -647,7 +641,7 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult)
// This serializes conflict resolution - only one polecat can resolve conflicts at a time.
// If the slot is already held, we skip creating the task and let the MR stay in queue.
// When the current resolution completes and merges, the slot is released.
func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, _ ProcessResult) (string, error) { // result unused but kept for future merge diagnostics
func (e *Engineer) createConflictResolutionTaskForMR(mr *MRInfo, _ ProcessResult) (string, error) { // result unused but kept for future merge diagnostics
// === MERGE SLOT GATE: Serialize conflict resolution ===
// Ensure merge slot exists (idempotent)
slotID, err := e.beads.MergeSlotEnsureExists()
@@ -743,14 +737,11 @@ The Refinery will automatically retry the merge after you force-push.`,
_, _ = fmt.Fprintf(e.output, "[Engineer] Created conflict resolution task: %s (P%d)\n", task.ID, task.Priority)
// Update the MR's retry count for priority scoring
mr.RetryCount = retryCount
return task.ID, nil
}
// IsBeadOpen checks if a bead is still open (not closed).
// This is used as a status checker for mrqueue.ListReady to filter blocked MRs.
// This is used as a status checker to filter blocked MRs.
func (e *Engineer) IsBeadOpen(beadID string) (bool, error) {
issue, err := e.beads.Show(beadID)
if err != nil {
@@ -762,15 +753,172 @@ func (e *Engineer) IsBeadOpen(beadID string) (bool, error) {
}
// ListReadyMRs returns MRs that are ready for processing:
// - Not claimed by another worker (or claim is stale)
// - Not blocked by an open task
// Sorted by priority score (highest first).
func (e *Engineer) ListReadyMRs() ([]*mrqueue.MR, error) {
return e.mrQueue.ListReady(e.IsBeadOpen)
// - Not claimed by another worker (checked via assignee field)
// - Not blocked by an open task (handled by bd ready)
// Sorted by priority (highest first).
//
// This queries beads for merge-request wisps.
func (e *Engineer) ListReadyMRs() ([]*MRInfo, error) {
// Query beads for ready merge-request issues
issues, err := e.beads.ReadyWithType("merge-request")
if err != nil {
return nil, fmt.Errorf("querying beads for merge-requests: %w", err)
}
// Convert beads issues to MRInfo
var mrs []*MRInfo
for _, issue := range issues {
fields := beads.ParseMRFields(issue)
if fields == nil {
continue // Skip issues without MR fields
}
// Skip if already assigned (claimed by another worker)
if issue.Assignee != "" {
// TODO: Add stale claim detection based on updated_at
continue
}
// Parse convoy created_at if present
var convoyCreatedAt *time.Time
if fields.ConvoyCreatedAt != "" {
if t, err := time.Parse(time.RFC3339, fields.ConvoyCreatedAt); err == nil {
convoyCreatedAt = &t
}
}
// Parse issue created_at
var createdAt time.Time
if issue.CreatedAt != "" {
if t, err := time.Parse(time.RFC3339, issue.CreatedAt); err == nil {
createdAt = t
}
}
mr := &MRInfo{
ID: issue.ID,
Branch: fields.Branch,
Target: fields.Target,
SourceIssue: fields.SourceIssue,
Worker: fields.Worker,
Rig: fields.Rig,
Title: issue.Title,
Priority: issue.Priority,
AgentBead: fields.AgentBead,
RetryCount: fields.RetryCount,
ConvoyID: fields.ConvoyID,
ConvoyCreatedAt: convoyCreatedAt,
CreatedAt: createdAt,
}
mrs = append(mrs, mr)
}
return mrs, nil
}
// ListBlockedMRs returns MRs that are blocked by open tasks.
// Useful for monitoring/reporting.
func (e *Engineer) ListBlockedMRs() ([]*mrqueue.MR, error) {
return e.mrQueue.ListBlocked(e.IsBeadOpen)
//
// This queries beads for blocked merge-request issues.
func (e *Engineer) ListBlockedMRs() ([]*MRInfo, error) {
// Query all merge-request issues (both ready and blocked)
issues, err := e.beads.List(beads.ListOptions{
Status: "open",
Label: "gt:merge-request",
Priority: -1, // No priority filter
})
if err != nil {
return nil, fmt.Errorf("querying beads for merge-requests: %w", err)
}
// Filter for blocked issues (those with open blockers)
var mrs []*MRInfo
for _, issue := range issues {
// Skip if not blocked
if len(issue.BlockedBy) == 0 {
continue
}
// Check if any blocker is still open
hasOpenBlocker := false
for _, blockerID := range issue.BlockedBy {
isOpen, err := e.IsBeadOpen(blockerID)
if err == nil && isOpen {
hasOpenBlocker = true
break
}
}
if !hasOpenBlocker {
continue // All blockers are closed, not blocked
}
fields := beads.ParseMRFields(issue)
if fields == nil {
continue
}
// Parse convoy created_at if present
var convoyCreatedAt *time.Time
if fields.ConvoyCreatedAt != "" {
if t, err := time.Parse(time.RFC3339, fields.ConvoyCreatedAt); err == nil {
convoyCreatedAt = &t
}
}
// Parse issue created_at
var createdAt time.Time
if issue.CreatedAt != "" {
if t, err := time.Parse(time.RFC3339, issue.CreatedAt); err == nil {
createdAt = t
}
}
// Use the first open blocker as BlockedBy
blockedBy := ""
for _, blockerID := range issue.BlockedBy {
isOpen, err := e.IsBeadOpen(blockerID)
if err == nil && isOpen {
blockedBy = blockerID
break
}
}
mr := &MRInfo{
ID: issue.ID,
Branch: fields.Branch,
Target: fields.Target,
SourceIssue: fields.SourceIssue,
Worker: fields.Worker,
Rig: fields.Rig,
Title: issue.Title,
Priority: issue.Priority,
AgentBead: fields.AgentBead,
RetryCount: fields.RetryCount,
ConvoyID: fields.ConvoyID,
ConvoyCreatedAt: convoyCreatedAt,
CreatedAt: createdAt,
BlockedBy: blockedBy,
}
mrs = append(mrs, mr)
}
return mrs, nil
}
// ClaimMR claims an MR for processing by setting the assignee field.
// This replaces mrqueue.Claim() for beads-based MRs.
// The workerID is typically the refinery's identifier (e.g., "gastown/refinery").
func (e *Engineer) ClaimMR(mrID, workerID string) error {
return e.beads.Update(mrID, beads.UpdateOptions{
Assignee: &workerID,
})
}
// ReleaseMR releases a claimed MR back to the queue by clearing the assignee.
// This replaces mrqueue.Release() for beads-based MRs.
func (e *Engineer) ReleaseMR(mrID string) error {
empty := ""
return e.beads.Update(mrID, beads.UpdateOptions{
Assignee: &empty,
})
}

View File

@@ -16,7 +16,6 @@ import (
"github.com/steveyegge/gastown/internal/constants"
"github.com/steveyegge/gastown/internal/events"
"github.com/steveyegge/gastown/internal/mail"
"github.com/steveyegge/gastown/internal/mrqueue"
"github.com/steveyegge/gastown/internal/rig"
"github.com/steveyegge/gastown/internal/runtime"
"github.com/steveyegge/gastown/internal/session"
@@ -358,7 +357,7 @@ func (m *Manager) calculateIssueScore(issue *beads.Issue, now time.Time) float64
}
// Build score input
input := mrqueue.ScoreInput{
input := ScoreInput{
Priority: issue.Priority,
MRCreatedAt: mrCreatedAt,
Now: now,
@@ -376,7 +375,7 @@ func (m *Manager) calculateIssueScore(issue *beads.Issue, now time.Time) float64
}
}
return mrqueue.ScoreMRWithDefaults(input)
return ScoreMRWithDefaults(input)
}
// issueToMR converts a beads issue to a MergeRequest.

View File

@@ -1,57 +1,7 @@
// Package mrqueue provides merge request queue storage and priority scoring.
//
// # MQ Priority Objective Function
//
// The merge queue uses a priority scoring function to determine processing order.
// Higher scores mean higher priority (process first).
//
// ## Scoring Formula
//
// score = BaseScore
// + ConvoyAgeWeight * hoursOld(convoy) // Prevent starvation
// + PriorityWeight * (4 - priority) // P0 > P4
// - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing
// + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker
//
// ## Default Weights
//
// BaseScore: 1000.0 (keeps all scores positive)
// ConvoyAgeWeight: 10.0 (10 pts/hour = 240 pts/day)
// PriorityWeight: 100.0 (P0=+400, P4=+0)
// RetryPenalty: 50.0 (each retry loses 50 pts)
// MRAgeWeight: 1.0 (1 pt/hour, minor FIFO factor)
// MaxRetryPenalty: 300.0 (caps at 6 retries worth)
//
// ## Design Principles
//
// 1. Deterministic: same inputs always produce same score (uses explicit Now param)
//
// 2. Convoy Starvation Prevention: older convoys escalate in priority. A 48-hour
// old P4 convoy will beat a fresh P0 standalone issue (+480 vs +400).
//
// 3. Priority Respect: within similar convoy ages, P0 issues beat P4 issues.
//
// 4. Thrashing Prevention: MRs that repeatedly fail with conflicts get
// deprioritized, giving the repo state time to stabilize.
//
// 5. FIFO Fairness: within same convoy/priority/retry state, older MRs go first.
//
// ## Example Scores
//
// Fresh P0, no convoy: 1400 (1000 + 400)
// Fresh P4, no convoy: 1000 (1000 + 0)
// Fresh P2, 24h convoy: 1440 (1000 + 200 + 240)
// Fresh P4, 48h convoy: 1480 (1000 + 0 + 480)
// P2, 24h convoy, 3 retries: 1290 (1000 + 200 + 240 - 150)
// P0, no convoy, 6+ retries (capped): 1100 (1000 + 400 - 300)
//
// ## Tuning
//
// All weights are configurable via ScoreConfig. The defaults are designed so:
// - A 48-hour convoy beats any standalone priority (starvation prevention)
// - Priority differences dominate within same convoy
// - Retry penalty is significant but capped (eventual progress guaranteed)
package mrqueue
// Package refinery provides the merge queue processing agent.
// This file contains priority scoring logic for merge requests.
package refinery
import (
"time"
@@ -134,13 +84,6 @@ type ScoreInput struct {
// + PriorityWeight * (4 - priority) // P0=+400, P4=+0
// - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing
// + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker
//
// Design principles:
// - Deterministic: same inputs always produce same score
// - Convoy starvation prevention: older convoys escalate in priority
// - Priority respect: P0 bugs beat P4 backlog items
// - Thrashing prevention: repeated failures get deprioritized
// - FIFO fairness: within same convoy/priority, older MRs go first
func ScoreMR(input ScoreInput, config ScoreConfig) float64 {
now := input.Now
if now.IsZero() {
@@ -192,12 +135,12 @@ func ScoreMRWithDefaults(input ScoreInput) float64 {
// Score calculates the priority score for this MR using default config.
// Higher scores mean higher priority (process first).
func (mr *MR) Score() float64 {
func (mr *MRInfo) Score() float64 {
return mr.ScoreAt(time.Now())
}
// ScoreAt calculates the priority score at a specific time (for deterministic testing).
func (mr *MR) ScoreAt(now time.Time) float64 {
func (mr *MRInfo) ScoreAt(now time.Time) float64 {
input := ScoreInput{
Priority: mr.Priority,
MRCreatedAt: mr.CreatedAt,

View File

@@ -1,102 +1,42 @@
package feed
import (
"bufio"
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"time"
"github.com/steveyegge/gastown/internal/mrqueue"
)
// MQEventSource reads MQ lifecycle events from mq_events.jsonl
// MQEventSource was used to read MQ lifecycle events from mq_events.jsonl.
// The mrqueue package has been removed, so this is now a no-op stub.
// MR events can be observed via beads activity instead.
type MQEventSource struct {
file *os.File
events chan Event
cancel context.CancelFunc
logPath string
events chan Event
cancel context.CancelFunc
}
// NewMQEventSource creates a source that tails MQ events from a beads directory.
// NewMQEventSource creates a stub source that produces no events.
// The mrqueue event log is no longer written.
func NewMQEventSource(beadsDir string) (*MQEventSource, error) {
logPath := filepath.Join(beadsDir, "mq_events.jsonl")
// Create file if it doesn't exist
if _, err := os.Stat(logPath); os.IsNotExist(err) {
// Ensure directory exists
if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil {
return nil, err
}
// Create empty file
f, err := os.Create(logPath)
if err != nil {
return nil, err
}
_ = f.Close() //nolint:gosec // G104: best-effort close on file creation
}
file, err := os.Open(logPath)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
source := &MQEventSource{
file: file,
events: make(chan Event, 100),
cancel: cancel,
logPath: logPath,
events: make(chan Event, 1),
cancel: cancel,
}
go source.tail(ctx)
// Start a goroutine that just waits for cancellation
go func() {
<-ctx.Done()
close(source.events)
}()
return source, nil
}
// NewMQEventSourceFromWorkDir creates an MQ event source by finding the beads directory.
// NewMQEventSourceFromWorkDir creates an MQ event source (stub).
func NewMQEventSourceFromWorkDir(workDir string) (*MQEventSource, error) {
beadsDir, err := FindBeadsDir(workDir)
if err != nil {
return nil, err
}
return NewMQEventSource(beadsDir)
return NewMQEventSource("")
}
// tail follows the MQ event log file and sends events.
func (s *MQEventSource) tail(ctx context.Context) {
defer close(s.events)
// Seek to end for live tailing
_, _ = s.file.Seek(0, 2)
scanner := bufio.NewScanner(s.file)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for scanner.Scan() {
line := scanner.Text()
if event := parseMQEventLine(line); event != nil {
select {
case s.events <- *event:
default:
// Drop event if channel full
}
}
}
}
}
}
// Events returns the event channel.
// Events returns the event channel (always empty).
func (s *MQEventSource) Events() <-chan Event {
return s.events
}
@@ -104,86 +44,5 @@ func (s *MQEventSource) Events() <-chan Event {
// Close stops the source.
func (s *MQEventSource) Close() error {
s.cancel()
return s.file.Close()
}
// parseMQEventLine parses a line from mq_events.jsonl into a feed Event.
func parseMQEventLine(line string) *Event {
if strings.TrimSpace(line) == "" {
return nil
}
var mqEvent mrqueue.Event
if err := json.Unmarshal([]byte(line), &mqEvent); err != nil {
return nil
}
// Convert MQ event to feed Event
feedType := mapMQEventType(mqEvent.Type)
message := formatMQEventMessage(mqEvent)
return &Event{
Time: mqEvent.Timestamp,
Type: feedType,
Actor: "refinery",
Target: mqEvent.MRID,
Message: message,
Rig: mqEvent.Rig,
Role: "refinery",
Raw: line,
}
}
// mapMQEventType maps MQ event types to feed event types.
func mapMQEventType(mqType mrqueue.EventType) string {
switch mqType {
case mrqueue.EventMergeStarted:
return "merge_started"
case mrqueue.EventMerged:
return "merged"
case mrqueue.EventMergeFailed:
return "merge_failed"
case mrqueue.EventMergeSkipped:
return "merge_skipped"
default:
return string(mqType)
}
}
// formatMQEventMessage creates a human-readable message for an MQ event.
func formatMQEventMessage(e mrqueue.Event) string {
branchInfo := e.Branch
if e.Target != "" {
branchInfo += " -> " + e.Target
}
switch e.Type {
case mrqueue.EventMergeStarted:
return "Merge started: " + branchInfo
case mrqueue.EventMerged:
msg := "Merged: " + branchInfo
if e.MergeCommit != "" {
// Show short commit SHA
sha := e.MergeCommit
if len(sha) > 8 {
sha = sha[:8]
}
msg += " (" + sha + ")"
}
return msg
case mrqueue.EventMergeFailed:
msg := "Merge failed: " + branchInfo
if e.Reason != "" {
msg += " - " + e.Reason
}
return msg
case mrqueue.EventMergeSkipped:
msg := "Merge skipped: " + branchInfo
if e.Reason != "" {
msg += " - " + e.Reason
}
return msg
default:
return string(e.Type) + ": " + branchInfo
}
return nil
}

View File

@@ -1,144 +0,0 @@
package feed
import (
"encoding/json"
"testing"
"time"
"github.com/steveyegge/gastown/internal/mrqueue"
)
func TestParseMQEventLine(t *testing.T) {
tests := []struct {
name string
event mrqueue.Event
wantType string
wantTarget string
wantContains string // Substring in message
}{
{
name: "merge_started",
event: mrqueue.Event{
Timestamp: time.Now(),
Type: mrqueue.EventMergeStarted,
MRID: "mr-123",
Branch: "polecat/nux",
Target: "main",
Worker: "nux",
Rig: "gastown",
},
wantType: "merge_started",
wantTarget: "mr-123",
wantContains: "Merge started",
},
{
name: "merged",
event: mrqueue.Event{
Timestamp: time.Now(),
Type: mrqueue.EventMerged,
MRID: "mr-456",
Branch: "polecat/toast",
Target: "main",
Worker: "toast",
Rig: "gastown",
MergeCommit: "abc123def456789",
},
wantType: "merged",
wantTarget: "mr-456",
wantContains: "abc123de", // Short SHA
},
{
name: "merge_failed",
event: mrqueue.Event{
Timestamp: time.Now(),
Type: mrqueue.EventMergeFailed,
MRID: "mr-789",
Branch: "polecat/capable",
Target: "main",
Worker: "capable",
Rig: "gastown",
Reason: "conflict in main.go",
},
wantType: "merge_failed",
wantTarget: "mr-789",
wantContains: "conflict in main.go",
},
{
name: "merge_skipped",
event: mrqueue.Event{
Timestamp: time.Now(),
Type: mrqueue.EventMergeSkipped,
MRID: "mr-999",
Branch: "polecat/skip",
Target: "main",
Reason: "already merged",
},
wantType: "merge_skipped",
wantTarget: "mr-999",
wantContains: "already merged",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Marshal to JSON line
data, err := json.Marshal(tt.event)
if err != nil {
t.Fatalf("Failed to marshal event: %v", err)
}
// Parse the line
result := parseMQEventLine(string(data))
if result == nil {
t.Fatal("parseMQEventLine returned nil")
}
if result.Type != tt.wantType {
t.Errorf("Type = %q, want %q", result.Type, tt.wantType)
}
if result.Target != tt.wantTarget {
t.Errorf("Target = %q, want %q", result.Target, tt.wantTarget)
}
if tt.wantContains != "" && !contains(result.Message, tt.wantContains) {
t.Errorf("Message = %q, want to contain %q", result.Message, tt.wantContains)
}
// Actor should be refinery
if result.Actor != "refinery" {
t.Errorf("Actor = %q, want %q", result.Actor, "refinery")
}
if result.Role != "refinery" {
t.Errorf("Role = %q, want %q", result.Role, "refinery")
}
})
}
}
func TestParseMQEventLineEmpty(t *testing.T) {
result := parseMQEventLine("")
if result != nil {
t.Error("Expected nil for empty line")
}
result = parseMQEventLine(" ")
if result != nil {
t.Error("Expected nil for whitespace-only line")
}
result = parseMQEventLine("not valid json")
if result != nil {
t.Error("Expected nil for invalid JSON")
}
}
func contains(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}