diff --git a/internal/cmd/mq_list.go b/internal/cmd/mq_list.go index 0feac99b..8ab71631 100644 --- a/internal/cmd/mq_list.go +++ b/internal/cmd/mq_list.go @@ -10,7 +10,7 @@ import ( "github.com/spf13/cobra" "github.com/steveyegge/gastown/internal/beads" - "github.com/steveyegge/gastown/internal/mrqueue" + "github.com/steveyegge/gastown/internal/refinery" "github.com/steveyegge/gastown/internal/style" ) @@ -260,7 +260,7 @@ func outputJSON(data interface{}) error { return enc.Encode(data) } -// calculateMRScore computes the priority score for an MR using the mrqueue scoring function. +// calculateMRScore computes the priority score for an MR using the refinery scoring function. // Higher scores mean higher priority (process first). func calculateMRScore(issue *beads.Issue, fields *beads.MRFields, now time.Time) float64 { // Parse MR creation time @@ -273,7 +273,7 @@ func calculateMRScore(issue *beads.Issue, fields *beads.MRFields, now time.Time) } // Build score input - input := mrqueue.ScoreInput{ + input := refinery.ScoreInput{ Priority: issue.Priority, MRCreatedAt: mrCreatedAt, Now: now, @@ -291,5 +291,5 @@ func calculateMRScore(issue *beads.Issue, fields *beads.MRFields, now time.Time) } } - return mrqueue.ScoreMRWithDefaults(input) + return refinery.ScoreMRWithDefaults(input) } diff --git a/internal/cmd/refinery.go b/internal/cmd/refinery.go index cf44cd25..4ca2ffcf 100644 --- a/internal/cmd/refinery.go +++ b/internal/cmd/refinery.go @@ -6,7 +6,7 @@ import ( "os" "github.com/spf13/cobra" - "github.com/steveyegge/gastown/internal/mrqueue" + "github.com/steveyegge/gastown/internal/beads" "github.com/steveyegge/gastown/internal/refinery" "github.com/steveyegge/gastown/internal/rig" "github.com/steveyegge/gastown/internal/style" @@ -540,19 +540,23 @@ func runRefineryClaim(cmd *cobra.Command, args []string) error { mrID := args[0] workerID := getWorkerID() - // Find the queue from current working directory - q, err := mrqueue.NewFromWorkdir(".") + // Find beads from current working directory + townRoot, err := workspace.FindFromCwdOrError() if err != nil { - return fmt.Errorf("finding merge queue: %w", err) + return fmt.Errorf("not in a Gas Town workspace: %w", err) + } + rigName, err := inferRigFromCwd(townRoot) + if err != nil { + return fmt.Errorf("could not determine rig: %w", err) } - if err := q.Claim(mrID, workerID); err != nil { - if err == mrqueue.ErrNotFound { - return fmt.Errorf("MR %s not found in queue", mrID) - } - if err == mrqueue.ErrAlreadyClaimed { - return fmt.Errorf("MR %s is already claimed by another worker", mrID) - } + _, r, err := getRig(rigName) + if err != nil { + return err + } + + eng := refinery.NewEngineer(r) + if err := eng.ClaimMR(mrID, workerID); err != nil { return fmt.Errorf("claiming MR: %w", err) } @@ -563,12 +567,23 @@ func runRefineryClaim(cmd *cobra.Command, args []string) error { func runRefineryRelease(cmd *cobra.Command, args []string) error { mrID := args[0] - q, err := mrqueue.NewFromWorkdir(".") + // Find beads from current working directory + townRoot, err := workspace.FindFromCwdOrError() if err != nil { - return fmt.Errorf("finding merge queue: %w", err) + return fmt.Errorf("not in a Gas Town workspace: %w", err) + } + rigName, err := inferRigFromCwd(townRoot) + if err != nil { + return fmt.Errorf("could not determine rig: %w", err) } - if err := q.Release(mrID); err != nil { + _, r, err := getRig(rigName) + if err != nil { + return err + } + + eng := refinery.NewEngineer(r) + if err := eng.ReleaseMR(mrID); err != nil { return fmt.Errorf("releasing MR: %w", err) } @@ -587,10 +602,35 @@ func runRefineryUnclaimed(cmd *cobra.Command, args []string) error { return err } - q := mrqueue.New(r.Path) - unclaimed, err := q.ListUnclaimed() + // Query beads for merge-request issues without assignee + b := beads.New(r.Path) + issues, err := b.List(beads.ListOptions{ + Status: "open", + Label: "gt:merge-request", + Priority: -1, + }) if err != nil { - return fmt.Errorf("listing unclaimed MRs: %w", err) + return fmt.Errorf("listing merge requests: %w", err) + } + + // Filter for unclaimed (no assignee) + var unclaimed []*refinery.MRInfo + for _, issue := range issues { + if issue.Assignee != "" { + continue + } + fields := beads.ParseMRFields(issue) + if fields == nil { + continue + } + mr := &refinery.MRInfo{ + ID: issue.ID, + Branch: fields.Branch, + Target: fields.Target, + Worker: fields.Worker, + Priority: issue.Priority, + } + unclaimed = append(unclaimed, mr) } // JSON output diff --git a/internal/mrqueue/events.go b/internal/mrqueue/events.go deleted file mode 100644 index 1afbe85c..00000000 --- a/internal/mrqueue/events.go +++ /dev/null @@ -1,152 +0,0 @@ -// Package mrqueue provides merge request queue storage and events. -package mrqueue - -import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "sync" - "time" -) - -// EventType represents the type of MQ lifecycle event. -type EventType string - -const ( - // EventMergeStarted indicates refinery began processing an MR. - EventMergeStarted EventType = "merge_started" - // EventMerged indicates an MR was successfully merged. - EventMerged EventType = "merged" - // EventMergeFailed indicates a merge failed (conflict, tests, etc.). - EventMergeFailed EventType = "merge_failed" - // EventMergeSkipped indicates an MR was skipped (already merged, etc.). - EventMergeSkipped EventType = "merge_skipped" -) - -// Event represents a single MQ lifecycle event. -type Event struct { - Timestamp time.Time `json:"timestamp"` - Type EventType `json:"type"` - MRID string `json:"mr_id"` - Branch string `json:"branch"` - Target string `json:"target"` - Worker string `json:"worker,omitempty"` - SourceIssue string `json:"source_issue,omitempty"` - Rig string `json:"rig,omitempty"` - MergeCommit string `json:"merge_commit,omitempty"` // For merged events - Reason string `json:"reason,omitempty"` // For failed/skipped events -} - -// EventLogger handles writing MQ events to the event log. -type EventLogger struct { - logPath string - mu sync.Mutex -} - -// NewEventLogger creates a new EventLogger for the given beads directory. -func NewEventLogger(beadsDir string) *EventLogger { - return &EventLogger{ - logPath: filepath.Join(beadsDir, "mq_events.jsonl"), - } -} - -// NewEventLoggerFromRig creates an EventLogger for the given rig path. -func NewEventLoggerFromRig(rigPath string) *EventLogger { - return NewEventLogger(filepath.Join(rigPath, ".beads")) -} - -// LogEvent writes an event to the MQ event log. -func (l *EventLogger) LogEvent(event Event) error { - l.mu.Lock() - defer l.mu.Unlock() - - // Ensure timestamp is set - if event.Timestamp.IsZero() { - event.Timestamp = time.Now() - } - - // Ensure log directory exists - if err := os.MkdirAll(filepath.Dir(l.logPath), 0755); err != nil { - return fmt.Errorf("creating log directory: %w", err) - } - - // Marshal event to JSON - data, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("marshaling event: %w", err) - } - - // Append to log file - f, err := os.OpenFile(l.logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) - if err != nil { - return fmt.Errorf("opening event log: %w", err) - } - defer f.Close() - - if _, err := f.Write(append(data, '\n')); err != nil { - return fmt.Errorf("writing event: %w", err) - } - - return nil -} - -// LogMergeStarted logs a merge_started event. -func (l *EventLogger) LogMergeStarted(mr *MR) error { - return l.LogEvent(Event{ - Type: EventMergeStarted, - MRID: mr.ID, - Branch: mr.Branch, - Target: mr.Target, - Worker: mr.Worker, - SourceIssue: mr.SourceIssue, - Rig: mr.Rig, - }) -} - -// LogMerged logs a merged event. -func (l *EventLogger) LogMerged(mr *MR, mergeCommit string) error { - return l.LogEvent(Event{ - Type: EventMerged, - MRID: mr.ID, - Branch: mr.Branch, - Target: mr.Target, - Worker: mr.Worker, - SourceIssue: mr.SourceIssue, - Rig: mr.Rig, - MergeCommit: mergeCommit, - }) -} - -// LogMergeFailed logs a merge_failed event. -func (l *EventLogger) LogMergeFailed(mr *MR, reason string) error { - return l.LogEvent(Event{ - Type: EventMergeFailed, - MRID: mr.ID, - Branch: mr.Branch, - Target: mr.Target, - Worker: mr.Worker, - SourceIssue: mr.SourceIssue, - Rig: mr.Rig, - Reason: reason, - }) -} - -// LogMergeSkipped logs a merge_skipped event. -func (l *EventLogger) LogMergeSkipped(mr *MR, reason string) error { - return l.LogEvent(Event{ - Type: EventMergeSkipped, - MRID: mr.ID, - Branch: mr.Branch, - Target: mr.Target, - Worker: mr.Worker, - SourceIssue: mr.SourceIssue, - Rig: mr.Rig, - Reason: reason, - }) -} - -// LogPath returns the path to the event log file. -func (l *EventLogger) LogPath() string { - return l.logPath -} diff --git a/internal/mrqueue/events_test.go b/internal/mrqueue/events_test.go deleted file mode 100644 index cc5ed668..00000000 --- a/internal/mrqueue/events_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package mrqueue - -import ( - "encoding/json" - "os" - "path/filepath" - "testing" - "time" -) - -func TestEventLogger(t *testing.T) { - // Create temp directory - tmpDir, err := os.MkdirTemp("", "mrqueue-test") - if err != nil { - t.Fatalf("Failed to create temp dir: %v", err) - } - defer os.RemoveAll(tmpDir) - - beadsDir := filepath.Join(tmpDir, ".beads") - if err := os.MkdirAll(beadsDir, 0755); err != nil { - t.Fatalf("Failed to create beads dir: %v", err) - } - - logger := NewEventLogger(beadsDir) - - // Test MR - mr := &MR{ - ID: "mr-test-123", - Branch: "polecat/test", - Target: "main", - SourceIssue: "gt-abc", - Worker: "test-worker", - Rig: "test-rig", - } - - // Log merge_started - if err := logger.LogMergeStarted(mr); err != nil { - t.Errorf("LogMergeStarted failed: %v", err) - } - - // Log merged - if err := logger.LogMerged(mr, "abc123def456"); err != nil { - t.Errorf("LogMerged failed: %v", err) - } - - // Log merge_failed - if err := logger.LogMergeFailed(mr, "conflict in file.go"); err != nil { - t.Errorf("LogMergeFailed failed: %v", err) - } - - // Log merge_skipped - if err := logger.LogMergeSkipped(mr, "already merged"); err != nil { - t.Errorf("LogMergeSkipped failed: %v", err) - } - - // Read and verify events - logPath := logger.LogPath() - data, err := os.ReadFile(logPath) - if err != nil { - t.Fatalf("Failed to read log file: %v", err) - } - - lines := splitLines(string(data)) - if len(lines) != 4 { - t.Errorf("Expected 4 events, got %d", len(lines)) - } - - // Verify each event type - expectedTypes := []EventType{EventMergeStarted, EventMerged, EventMergeFailed, EventMergeSkipped} - for i, line := range lines { - if line == "" { - continue - } - var event Event - if err := json.Unmarshal([]byte(line), &event); err != nil { - t.Errorf("Failed to parse event %d: %v", i, err) - continue - } - - if event.Type != expectedTypes[i] { - t.Errorf("Event %d: expected type %s, got %s", i, expectedTypes[i], event.Type) - } - - if event.MRID != mr.ID { - t.Errorf("Event %d: expected MR ID %s, got %s", i, mr.ID, event.MRID) - } - - if event.Branch != mr.Branch { - t.Errorf("Event %d: expected branch %s, got %s", i, mr.Branch, event.Branch) - } - - // Check timestamp is recent - if time.Since(event.Timestamp) > time.Minute { - t.Errorf("Event %d: timestamp too old: %v", i, event.Timestamp) - } - } -} - -func splitLines(s string) []string { - var lines []string - start := 0 - for i := 0; i < len(s); i++ { - if s[i] == '\n' { - if start < i { - lines = append(lines, s[start:i]) - } - start = i + 1 - } - } - if start < len(s) { - lines = append(lines, s[start:]) - } - return lines -} diff --git a/internal/mrqueue/mrqueue.go b/internal/mrqueue/mrqueue.go deleted file mode 100644 index c3c3576e..00000000 --- a/internal/mrqueue/mrqueue.go +++ /dev/null @@ -1,469 +0,0 @@ -// Package mrqueue provides merge request queue storage. -// MRs are stored locally in .beads/mq/ and deleted after merge. -// This avoids sync overhead for transient MR state. -package mrqueue - -import ( - "crypto/rand" - "encoding/hex" - "encoding/json" - "fmt" - "os" - "path/filepath" - "sort" - "strings" - "time" -) - -// MR represents a merge request in the queue. -type MR struct { - ID string `json:"id"` - Branch string `json:"branch"` // Source branch (e.g., "polecat/nux") - Target string `json:"target"` // Target branch (e.g., "main") - SourceIssue string `json:"source_issue"` // The work item being merged - Worker string `json:"worker"` // Who did the work - Rig string `json:"rig"` // Which rig - Title string `json:"title"` // MR title - Priority int `json:"priority"` // Priority (lower = higher priority) - CreatedAt time.Time `json:"created_at"` - AgentBead string `json:"agent_bead,omitempty"` // Agent bead ID that created this MR (for traceability) - - // Priority scoring fields - RetryCount int `json:"retry_count,omitempty"` // Conflict retry count for priority penalty - ConvoyID string `json:"convoy_id,omitempty"` // Parent convoy ID if part of a convoy - ConvoyCreatedAt *time.Time `json:"convoy_created_at,omitempty"` // Convoy creation time for starvation prevention - - // Claiming fields for parallel refinery workers - ClaimedBy string `json:"claimed_by,omitempty"` // Worker ID that claimed this MR - ClaimedAt *time.Time `json:"claimed_at,omitempty"` // When the MR was claimed - - // Blocking fields for non-blocking delegation - BlockedBy string `json:"blocked_by,omitempty"` // Task ID that blocks this MR (e.g., conflict resolution task) -} - -// Queue manages the MR storage. -type Queue struct { - dir string // .beads/mq/ directory -} - -// New creates a new MR queue for the given rig path. -func New(rigPath string) *Queue { - return &Queue{ - dir: filepath.Join(rigPath, ".beads", "mq"), - } -} - -// NewFromWorkdir creates a queue by finding the rig root from a working directory. -func NewFromWorkdir(workdir string) (*Queue, error) { - // Walk up to find .beads or rig root - dir := workdir - for { - beadsDir := filepath.Join(dir, ".beads") - if info, err := os.Stat(beadsDir); err == nil && info.IsDir() { - return &Queue{dir: filepath.Join(beadsDir, "mq")}, nil - } - - parent := filepath.Dir(dir) - if parent == dir { - return nil, fmt.Errorf("could not find .beads directory from %s", workdir) - } - dir = parent - } -} - -// EnsureDir creates the MQ directory if it doesn't exist. -func (q *Queue) EnsureDir() error { - return os.MkdirAll(q.dir, 0755) -} - -// generateID creates a unique MR ID. -func generateID() string { - b := make([]byte, 4) - _, _ = rand.Read(b) - return fmt.Sprintf("mr-%d-%s", time.Now().Unix(), hex.EncodeToString(b)) -} - -// Submit adds a new MR to the queue. -func (q *Queue) Submit(mr *MR) error { - if err := q.EnsureDir(); err != nil { - return fmt.Errorf("creating mq directory: %w", err) - } - - if mr.ID == "" { - mr.ID = generateID() - } - if mr.CreatedAt.IsZero() { - mr.CreatedAt = time.Now() - } - - data, err := json.MarshalIndent(mr, "", " ") - if err != nil { - return fmt.Errorf("marshaling MR: %w", err) - } - - path := filepath.Join(q.dir, mr.ID+".json") - if err := os.WriteFile(path, data, 0644); err != nil { - return fmt.Errorf("writing MR file: %w", err) - } - - return nil -} - -// List returns all pending MRs, sorted by priority then creation time. -// Deprecated: Use ListByScore for priority-aware ordering. -func (q *Queue) List() ([]*MR, error) { - entries, err := os.ReadDir(q.dir) - if err != nil { - if os.IsNotExist(err) { - return nil, nil // Empty queue - } - return nil, fmt.Errorf("reading mq directory: %w", err) - } - - var mrs []*MR - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { - continue - } - - mr, err := q.load(filepath.Join(q.dir, entry.Name())) - if err != nil { - continue // Skip malformed files - } - mrs = append(mrs, mr) - } - - // Sort by priority (lower first), then by creation time (older first) - sort.Slice(mrs, func(i, j int) bool { - if mrs[i].Priority != mrs[j].Priority { - return mrs[i].Priority < mrs[j].Priority - } - return mrs[i].CreatedAt.Before(mrs[j].CreatedAt) - }) - - return mrs, nil -} - -// ListByScore returns all pending MRs sorted by priority score (highest first). -// Uses the ScoreMR function which considers: -// - Convoy age (prevents starvation) -// - Issue priority (P0-P4) -// - Retry count (prevents thrashing) -// - MR age (FIFO tiebreaker) -func (q *Queue) ListByScore() ([]*MR, error) { - entries, err := os.ReadDir(q.dir) - if err != nil { - if os.IsNotExist(err) { - return nil, nil // Empty queue - } - return nil, fmt.Errorf("reading mq directory: %w", err) - } - - now := time.Now() - var mrs []*MR - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { - continue - } - - mr, err := q.load(filepath.Join(q.dir, entry.Name())) - if err != nil { - continue // Skip malformed files - } - mrs = append(mrs, mr) - } - - // Sort by score (higher first = higher priority) - sort.Slice(mrs, func(i, j int) bool { - return mrs[i].ScoreAt(now) > mrs[j].ScoreAt(now) - }) - - return mrs, nil -} - -// Get retrieves a specific MR by ID. -func (q *Queue) Get(id string) (*MR, error) { - path := filepath.Join(q.dir, id+".json") - return q.load(path) -} - -// load reads an MR from a file path. -func (q *Queue) load(path string) (*MR, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - - var mr MR - if err := json.Unmarshal(data, &mr); err != nil { - return nil, err - } - - return &mr, nil -} - -// Remove deletes an MR from the queue (after successful merge). -func (q *Queue) Remove(id string) error { - path := filepath.Join(q.dir, id+".json") - err := os.Remove(path) - if os.IsNotExist(err) { - return nil // Already removed - } - return err -} - -// Count returns the number of pending MRs. -func (q *Queue) Count() int { - entries, err := os.ReadDir(q.dir) - if err != nil { - return 0 - } - - count := 0 - for _, entry := range entries { - if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".json") { - count++ - } - } - return count -} - -// Dir returns the queue directory path. -func (q *Queue) Dir() string { - return q.dir -} - -// ClaimStaleTimeout is how long before a claimed MR is considered stale. -// If a worker claims an MR but doesn't process it within this time, -// another worker can reclaim it. -const ClaimStaleTimeout = 10 * time.Minute - -// Claim attempts to claim an MR for processing by a specific worker. -// Returns nil if successful, ErrAlreadyClaimed if another worker has it, -// or ErrNotFound if the MR doesn't exist. -// Uses atomic file operations to prevent race conditions. -func (q *Queue) Claim(id, workerID string) error { - path := filepath.Join(q.dir, id+".json") - - // Read current state - mr, err := q.load(path) - if err != nil { - if os.IsNotExist(err) { - return ErrNotFound - } - return fmt.Errorf("loading MR: %w", err) - } - - // Check if already claimed by another worker - if mr.ClaimedBy != "" && mr.ClaimedBy != workerID { - // Check if claim is stale (worker may have crashed) - if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout { - return ErrAlreadyClaimed - } - // Stale claim - allow reclaim - } - - // Claim the MR - now := time.Now() - mr.ClaimedBy = workerID - mr.ClaimedAt = &now - - // Write atomically - data, err := json.MarshalIndent(mr, "", " ") - if err != nil { - return fmt.Errorf("marshaling MR: %w", err) - } - - // Write to temp file first, then rename (atomic on most filesystems) - tmpPath := path + ".tmp" - if err := os.WriteFile(tmpPath, data, 0644); err != nil { - return fmt.Errorf("writing temp file: %w", err) - } - if err := os.Rename(tmpPath, path); err != nil { - _ = os.Remove(tmpPath) // cleanup - return fmt.Errorf("renaming temp file: %w", err) - } - - return nil -} - -// Release releases a claimed MR back to the queue. -// Called when processing fails and the MR should be retried. -func (q *Queue) Release(id string) error { - path := filepath.Join(q.dir, id+".json") - - mr, err := q.load(path) - if err != nil { - if os.IsNotExist(err) { - return nil // Already removed - } - return fmt.Errorf("loading MR: %w", err) - } - - // Clear claim - mr.ClaimedBy = "" - mr.ClaimedAt = nil - - data, err := json.MarshalIndent(mr, "", " ") - if err != nil { - return fmt.Errorf("marshaling MR: %w", err) - } - - return os.WriteFile(path, data, 0644) -} - -// ListUnclaimed returns MRs that are not claimed or have stale claims. -// Sorted by priority then creation time. -func (q *Queue) ListUnclaimed() ([]*MR, error) { - all, err := q.List() - if err != nil { - return nil, err - } - - var unclaimed []*MR - for _, mr := range all { - if mr.ClaimedBy == "" { - unclaimed = append(unclaimed, mr) - continue - } - // Check if claim is stale - if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) >= ClaimStaleTimeout { - unclaimed = append(unclaimed, mr) - } - } - - return unclaimed, nil -} - -// ListClaimedBy returns MRs claimed by a specific worker. -func (q *Queue) ListClaimedBy(workerID string) ([]*MR, error) { - all, err := q.List() - if err != nil { - return nil, err - } - - var claimed []*MR - for _, mr := range all { - if mr.ClaimedBy == workerID { - claimed = append(claimed, mr) - } - } - - return claimed, nil -} - -// Common errors for claiming -var ( - ErrNotFound = fmt.Errorf("merge request not found") - ErrAlreadyClaimed = fmt.Errorf("merge request already claimed by another worker") -) - -// SetBlockedBy marks an MR as blocked by a task (e.g., conflict resolution). -// When the blocking task closes, the MR becomes ready for processing again. -func (q *Queue) SetBlockedBy(mrID, taskID string) error { - path := filepath.Join(q.dir, mrID+".json") - - mr, err := q.load(path) - if err != nil { - if os.IsNotExist(err) { - return ErrNotFound - } - return fmt.Errorf("loading MR: %w", err) - } - - mr.BlockedBy = taskID - - data, err := json.MarshalIndent(mr, "", " ") - if err != nil { - return fmt.Errorf("marshaling MR: %w", err) - } - - return os.WriteFile(path, data, 0644) -} - -// ClearBlockedBy removes the blocking task from an MR. -func (q *Queue) ClearBlockedBy(mrID string) error { - return q.SetBlockedBy(mrID, "") -} - -// IsBlocked checks if an MR is blocked by a task that is still open. -// If blocked, returns true and the blocking task ID. -// checkStatus is a function that checks if a bead is still open. -func (mr *MR) IsBlocked(checkStatus func(beadID string) (isOpen bool, err error)) (bool, string, error) { - if mr.BlockedBy == "" { - return false, "", nil - } - - isOpen, err := checkStatus(mr.BlockedBy) - if err != nil { - // If we can't check status, assume not blocked (fail open) - return false, "", nil - } - - return isOpen, mr.BlockedBy, nil -} - -// BeadStatusChecker is a function type that checks if a bead is open. -// Returns true if the bead is open (not closed), false if closed or not found. -type BeadStatusChecker func(beadID string) (isOpen bool, err error) - -// ListReady returns MRs that are ready for processing: -// - Not claimed by another worker (or claim is stale) -// - Not blocked by an open task -// Sorted by priority score (highest first). -// The checkStatus function is used to check if blocking tasks are still open. -func (q *Queue) ListReady(checkStatus BeadStatusChecker) ([]*MR, error) { - all, err := q.ListByScore() - if err != nil { - return nil, err - } - - var ready []*MR - for _, mr := range all { - // Skip if claimed by another worker (and not stale) - if mr.ClaimedBy != "" { - if mr.ClaimedAt != nil && time.Since(*mr.ClaimedAt) < ClaimStaleTimeout { - continue - } - // Stale claim - include in ready list - } - - // Skip if blocked by an open task - if mr.BlockedBy != "" && checkStatus != nil { - isOpen, err := checkStatus(mr.BlockedBy) - if err == nil && isOpen { - // Blocked by an open task - skip - continue - } - // If error or task closed, proceed (fail open) - } - - ready = append(ready, mr) - } - - return ready, nil -} - -// ListBlocked returns MRs that are blocked by open tasks. -// Useful for reporting/monitoring. -func (q *Queue) ListBlocked(checkStatus BeadStatusChecker) ([]*MR, error) { - all, err := q.List() - if err != nil { - return nil, err - } - - var blocked []*MR - for _, mr := range all { - if mr.BlockedBy == "" { - continue - } - if checkStatus != nil { - isOpen, err := checkStatus(mr.BlockedBy) - if err == nil && isOpen { - blocked = append(blocked, mr) - } - } - } - - return blocked, nil -} diff --git a/internal/mrqueue/priority_test.go b/internal/mrqueue/priority_test.go deleted file mode 100644 index fd71b745..00000000 --- a/internal/mrqueue/priority_test.go +++ /dev/null @@ -1,334 +0,0 @@ -package mrqueue - -import ( - "testing" - "time" -) - -func TestScoreMR_BaseScore(t *testing.T) { - now := time.Now() - config := DefaultScoreConfig() - - input := ScoreInput{ - Priority: 2, // P2 (medium) - MRCreatedAt: now, - RetryCount: 0, - Now: now, - } - - score := ScoreMR(input, config) - - // BaseScore(1000) + Priority(2 gives 4-2=2, so 2*100=200) = 1200 - expected := 1200.0 - if score != expected { - t.Errorf("expected score %f, got %f", expected, score) - } -} - -func TestScoreMR_PriorityOrdering(t *testing.T) { - now := time.Now() - - tests := []struct { - priority int - expected float64 - }{ - {0, 1400.0}, // P0: base(1000) + (4-0)*100 = 1400 - {1, 1300.0}, // P1: base(1000) + (4-1)*100 = 1300 - {2, 1200.0}, // P2: base(1000) + (4-2)*100 = 1200 - {3, 1100.0}, // P3: base(1000) + (4-3)*100 = 1100 - {4, 1000.0}, // P4: base(1000) + (4-4)*100 = 1000 - } - - for _, tt := range tests { - t.Run("P"+string(rune('0'+tt.priority)), func(t *testing.T) { - input := ScoreInput{ - Priority: tt.priority, - MRCreatedAt: now, - Now: now, - } - score := ScoreMRWithDefaults(input) - if score != tt.expected { - t.Errorf("P%d: expected %f, got %f", tt.priority, tt.expected, score) - } - }) - } - - // Verify ordering: P0 > P1 > P2 > P3 > P4 - for i := 0; i < 4; i++ { - input1 := ScoreInput{Priority: i, MRCreatedAt: now, Now: now} - input2 := ScoreInput{Priority: i + 1, MRCreatedAt: now, Now: now} - score1 := ScoreMRWithDefaults(input1) - score2 := ScoreMRWithDefaults(input2) - if score1 <= score2 { - t.Errorf("P%d (%f) should score higher than P%d (%f)", i, score1, i+1, score2) - } - } -} - -func TestScoreMR_ConvoyAgeEscalation(t *testing.T) { - now := time.Now() - config := DefaultScoreConfig() - - // MR without convoy - noConvoy := ScoreInput{ - Priority: 2, - MRCreatedAt: now, - Now: now, - } - scoreNoConvoy := ScoreMR(noConvoy, config) - - // MR with 24-hour old convoy - convoyTime := now.Add(-24 * time.Hour) - withConvoy := ScoreInput{ - Priority: 2, - MRCreatedAt: now, - ConvoyCreatedAt: &convoyTime, - Now: now, - } - scoreWithConvoy := ScoreMR(withConvoy, config) - - // 24 hours * 10 pts/hour = 240 extra points - expectedDiff := 240.0 - actualDiff := scoreWithConvoy - scoreNoConvoy - if actualDiff != expectedDiff { - t.Errorf("expected convoy age to add %f pts, got %f", expectedDiff, actualDiff) - } -} - -func TestScoreMR_ConvoyStarvationPrevention(t *testing.T) { - now := time.Now() - - // P4 issue in 48-hour old convoy vs P0 issue with no convoy - oldConvoy := now.Add(-48 * time.Hour) - lowPriorityOldConvoy := ScoreInput{ - Priority: 4, // P4 (lowest) - MRCreatedAt: now, - ConvoyCreatedAt: &oldConvoy, - Now: now, - } - - highPriorityNoConvoy := ScoreInput{ - Priority: 0, // P0 (highest) - MRCreatedAt: now, - Now: now, - } - - scoreOldConvoy := ScoreMRWithDefaults(lowPriorityOldConvoy) - scoreHighPriority := ScoreMRWithDefaults(highPriorityNoConvoy) - - // P4 with 48h convoy: 1000 + 0 + 480 = 1480 - // P0 with no convoy: 1000 + 400 + 0 = 1400 - // Old convoy should win (starvation prevention) - if scoreOldConvoy <= scoreHighPriority { - t.Errorf("48h old P4 convoy (%f) should beat P0 no convoy (%f) for starvation prevention", - scoreOldConvoy, scoreHighPriority) - } -} - -func TestScoreMR_RetryPenalty(t *testing.T) { - now := time.Now() - config := DefaultScoreConfig() - - // No retries - noRetry := ScoreInput{ - Priority: 2, - MRCreatedAt: now, - RetryCount: 0, - Now: now, - } - scoreNoRetry := ScoreMR(noRetry, config) - - // 3 retries - threeRetries := ScoreInput{ - Priority: 2, - MRCreatedAt: now, - RetryCount: 3, - Now: now, - } - scoreThreeRetries := ScoreMR(threeRetries, config) - - // 3 retries * 50 pts penalty = 150 pts less - expectedDiff := 150.0 - actualDiff := scoreNoRetry - scoreThreeRetries - if actualDiff != expectedDiff { - t.Errorf("expected 3 retries to lose %f pts, lost %f", expectedDiff, actualDiff) - } -} - -func TestScoreMR_RetryPenaltyCapped(t *testing.T) { - now := time.Now() - config := DefaultScoreConfig() - - // Max penalty is 300, so 10 retries should be same as 6 - sixRetries := ScoreInput{ - Priority: 2, - MRCreatedAt: now, - RetryCount: 6, - Now: now, - } - tenRetries := ScoreInput{ - Priority: 2, - MRCreatedAt: now, - RetryCount: 10, - Now: now, - } - - scoreSix := ScoreMR(sixRetries, config) - scoreTen := ScoreMR(tenRetries, config) - - if scoreSix != scoreTen { - t.Errorf("penalty should be capped: 6 retries (%f) should equal 10 retries (%f)", - scoreSix, scoreTen) - } - - // Both should be base(1000) + priority(200) - maxPenalty(300) = 900 - expected := 900.0 - if scoreSix != expected { - t.Errorf("expected capped score %f, got %f", expected, scoreSix) - } -} - -func TestScoreMR_MRAgeAsTiebreaker(t *testing.T) { - now := time.Now() - - // Two MRs with same priority, one submitted 10 hours ago - oldMR := ScoreInput{ - Priority: 2, - MRCreatedAt: now.Add(-10 * time.Hour), - Now: now, - } - newMR := ScoreInput{ - Priority: 2, - MRCreatedAt: now, - Now: now, - } - - scoreOld := ScoreMRWithDefaults(oldMR) - scoreNew := ScoreMRWithDefaults(newMR) - - // Old MR should have 10 pts more (1 pt/hour) - expectedDiff := 10.0 - actualDiff := scoreOld - scoreNew - if actualDiff != expectedDiff { - t.Errorf("older MR should score %f more, got %f", expectedDiff, actualDiff) - } -} - -func TestScoreMR_Deterministic(t *testing.T) { - fixedNow := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) - convoyTime := time.Date(2024, 12, 31, 12, 0, 0, 0, time.UTC) - mrTime := time.Date(2025, 1, 1, 10, 0, 0, 0, time.UTC) - - input := ScoreInput{ - Priority: 1, - MRCreatedAt: mrTime, - ConvoyCreatedAt: &convoyTime, - RetryCount: 2, - Now: fixedNow, - } - - // Run 100 times, should always be same - first := ScoreMRWithDefaults(input) - for i := 0; i < 100; i++ { - score := ScoreMRWithDefaults(input) - if score != first { - t.Errorf("score not deterministic: iteration %d got %f, expected %f", i, score, first) - } - } -} - -func TestScoreMR_InvalidPriorityClamped(t *testing.T) { - now := time.Now() - - // Negative priority should clamp to 0 bonus (priority=4) - negativePriority := ScoreInput{ - Priority: -1, - MRCreatedAt: now, - Now: now, - } - scoreNegative := ScoreMRWithDefaults(negativePriority) - - // Very high priority should clamp to max bonus (priority=0) - highPriority := ScoreInput{ - Priority: 10, - MRCreatedAt: now, - Now: now, - } - scoreHigh := ScoreMRWithDefaults(highPriority) - - // Negative priority gets clamped to max bonus (4*100=400) - if scoreNegative != 1400.0 { - t.Errorf("negative priority should clamp to P0 bonus, got %f", scoreNegative) - } - - // High priority (10) gives 4-10=-6, clamped to 0 - if scoreHigh != 1000.0 { - t.Errorf("priority>4 should give 0 bonus, got %f", scoreHigh) - } -} - -func TestMR_Score(t *testing.T) { - now := time.Now() - convoyTime := now.Add(-12 * time.Hour) - - mr := &MR{ - Priority: 1, - CreatedAt: now.Add(-2 * time.Hour), - ConvoyCreatedAt: &convoyTime, - RetryCount: 1, - } - - score := mr.ScoreAt(now) - - // base(1000) + convoy(12*10=120) + priority(3*100=300) - retry(1*50=50) + mrAge(2*1=2) - expected := 1000.0 + 120.0 + 300.0 - 50.0 + 2.0 - if score != expected { - t.Errorf("MR.ScoreAt expected %f, got %f", expected, score) - } -} - -func TestScoreMR_EdgeCases(t *testing.T) { - now := time.Now() - - tests := []struct { - name string - input ScoreInput - }{ - { - name: "zero time MR", - input: ScoreInput{ - Priority: 2, - MRCreatedAt: time.Time{}, - Now: now, - }, - }, - { - name: "future MR", - input: ScoreInput{ - Priority: 2, - MRCreatedAt: now.Add(24 * time.Hour), - Now: now, - }, - }, - { - name: "future convoy", - input: ScoreInput{ - Priority: 2, - MRCreatedAt: now, - ConvoyCreatedAt: func() *time.Time { t := now.Add(24 * time.Hour); return &t }(), - Now: now, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Should not panic - score := ScoreMRWithDefaults(tt.input) - // Score should still be reasonable (>= base - maxPenalty) - if score < 700 { - t.Errorf("score %f unexpectedly low for edge case", score) - } - }) - } -} diff --git a/internal/protocol/refinery_handlers.go b/internal/protocol/refinery_handlers.go index 9829278c..c1bffc5c 100644 --- a/internal/protocol/refinery_handlers.go +++ b/internal/protocol/refinery_handlers.go @@ -4,14 +4,13 @@ import ( "fmt" "io" "os" - "time" "github.com/steveyegge/gastown/internal/mail" - "github.com/steveyegge/gastown/internal/mrqueue" ) // DefaultRefineryHandler provides the default implementation for Refinery protocol handlers. -// It receives MERGE_READY messages from the Witness and adds work to the merge queue. +// It receives MERGE_READY messages from the Witness and acknowledges verified work. +// Note: The Refinery now queries beads directly for merge requests (via ReadyWithType). type DefaultRefineryHandler struct { // Rig is the name of the rig this refinery processes. Rig string @@ -19,9 +18,6 @@ type DefaultRefineryHandler struct { // WorkDir is the working directory for operations. WorkDir string - // Queue is the merge request queue. - Queue *mrqueue.Queue - // Router is used to send mail messages. Router *mail.Router @@ -34,7 +30,6 @@ func NewRefineryHandler(rig, workDir string) *DefaultRefineryHandler { return &DefaultRefineryHandler{ Rig: rig, WorkDir: workDir, - Queue: mrqueue.New(workDir), Router: mail.NewRouter(workDir), Output: os.Stdout, } @@ -46,10 +41,10 @@ func (h *DefaultRefineryHandler) SetOutput(w io.Writer) { } // HandleMergeReady handles a MERGE_READY message from Witness. -// When a polecat's work is verified and ready, the Refinery: -// 1. Validates the merge request -// 2. Adds it to the merge queue -// 3. Acknowledges receipt +// When a polecat's work is verified and ready, the Refinery acknowledges receipt. +// +// NOTE: The merge-request bead is created by `gt done`, so we no longer need +// to add to the mrqueue here. The Refinery queries beads directly for ready MRs. func (h *DefaultRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) error { _, _ = fmt.Fprintf(h.Output, "[Refinery] MERGE_READY received for polecat %s\n", payload.Polecat) _, _ = fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch) @@ -64,25 +59,10 @@ func (h *DefaultRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) er return fmt.Errorf("missing polecat in MERGE_READY payload") } - // Create merge request (ID is generated by Submit if empty) - mr := &mrqueue.MR{ - Branch: payload.Branch, - Worker: payload.Polecat, - SourceIssue: payload.Issue, - Target: "main", // Default target, could be passed in payload - Rig: payload.Rig, - Title: fmt.Sprintf("Merge %s work on %s", payload.Polecat, payload.Issue), - CreatedAt: time.Now(), - } - - // Add to queue - if err := h.Queue.Submit(mr); err != nil { - _, _ = fmt.Fprintf(h.Output, "[Refinery] Error adding to queue: %v\n", err) - return fmt.Errorf("failed to add merge request to queue: %w", err) - } - - _, _ = fmt.Fprintf(h.Output, "[Refinery] ✓ Added to merge queue: %s\n", mr.ID) - _, _ = fmt.Fprintf(h.Output, " Queue length: %d\n", h.Queue.Count()) + // The merge-request bead is created by `gt done` with gt:merge-request label. + // The Refinery queries beads directly via ReadyWithType("merge-request"). + // No need to add to mrqueue - that was a duplicate tracking file. + _, _ = fmt.Fprintf(h.Output, "[Refinery] ✓ Work verified - Refinery will pick up MR via beads query\n") return nil } diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index e9c2877f..ff85b97d 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -16,7 +16,6 @@ import ( "github.com/steveyegge/gastown/internal/beads" "github.com/steveyegge/gastown/internal/git" "github.com/steveyegge/gastown/internal/mail" - "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/protocol" "github.com/steveyegge/gastown/internal/rig" ) @@ -70,18 +69,35 @@ func DefaultMergeQueueConfig() *MergeQueueConfig { } } +// MRInfo holds merge request information for display and processing. +// This replaces mrqueue.MR after the mrqueue package removal. +type MRInfo struct { + ID string // Bead ID (e.g., "gt-abc123") + Branch string // Source branch (e.g., "polecat/nux") + Target string // Target branch (e.g., "main") + SourceIssue string // The work item being merged + Worker string // Who did the work + Rig string // Which rig + Title string // MR title + Priority int // Priority (lower = higher priority) + AgentBead string // Agent bead ID that created this MR + RetryCount int // Conflict retry count + ConvoyID string // Parent convoy ID if part of a convoy + ConvoyCreatedAt *time.Time // Convoy creation time + CreatedAt time.Time // MR creation time + BlockedBy string // Task ID blocking this MR +} + // Engineer is the merge queue processor that polls for ready merge-requests // and processes them according to the merge queue design. type Engineer struct { - rig *rig.Rig - beads *beads.Beads - mrQueue *mrqueue.Queue - git *git.Git - config *MergeQueueConfig - workDir string - output io.Writer // Output destination for user-facing messages - eventLogger *mrqueue.EventLogger - router *mail.Router // Mail router for sending protocol messages + rig *rig.Rig + beads *beads.Beads + git *git.Git + config *MergeQueueConfig + workDir string + output io.Writer // Output destination for user-facing messages + router *mail.Router // Mail router for sending protocol messages // stopCh is used for graceful shutdown stopCh chan struct{} @@ -102,16 +118,14 @@ func NewEngineer(r *rig.Rig) *Engineer { } return &Engineer{ - rig: r, - beads: beads.New(r.Path), - mrQueue: mrqueue.New(r.Path), - git: git.NewGit(gitDir), - config: cfg, - workDir: gitDir, - output: os.Stdout, - eventLogger: mrqueue.NewEventLoggerFromRig(r.Path), - router: mail.NewRouter(r.Path), - stopCh: make(chan struct{}), + rig: r, + beads: beads.New(r.Path), + git: git.NewGit(gitDir), + config: cfg, + workDir: gitDir, + output: os.Stdout, + router: mail.NewRouter(r.Path), + stopCh: make(chan struct{}), } } @@ -479,31 +493,21 @@ func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) { _, _ = fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error) } -// ProcessMRFromQueue processes a merge request from wisp queue. -func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) ProcessResult { - // MR fields are directly on the struct (no parsing needed) - _, _ = fmt.Fprintln(e.output, "[Engineer] Processing MR from queue:") +// ProcessMRInfo processes a merge request from MRInfo. +func (e *Engineer) ProcessMRInfo(ctx context.Context, mr *MRInfo) ProcessResult { + // MR fields are directly on the struct + _, _ = fmt.Fprintln(e.output, "[Engineer] Processing MR:") _, _ = fmt.Fprintf(e.output, " Branch: %s\n", mr.Branch) _, _ = fmt.Fprintf(e.output, " Target: %s\n", mr.Target) _, _ = fmt.Fprintf(e.output, " Worker: %s\n", mr.Worker) _, _ = fmt.Fprintf(e.output, " Source: %s\n", mr.SourceIssue) - // Emit merge_started event - if err := e.eventLogger.LogMergeStarted(mr); err != nil { - _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merge_started event: %v\n", err) - } - // Use the shared merge logic return e.doMerge(ctx, mr.Branch, mr.Target, mr.SourceIssue) } -// handleSuccessFromQueue handles a successful merge from wisp queue. -func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) { - // Emit merged event - if err := e.eventLogger.LogMerged(mr, result.MergeCommit); err != nil { - _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merged event: %v\n", err) - } - +// HandleMRInfoSuccess handles a successful merge from MRInfo. +func (e *Engineer) HandleMRInfoSuccess(mr *MRInfo, result ProcessResult) { // Release merge slot if this was a conflict resolution // The slot is held while conflict resolution is in progress holder := e.rig.Name + "/refinery" @@ -518,7 +522,7 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) _, _ = fmt.Fprintf(e.output, "[Engineer] Released merge slot\n") } - // Update and close the MR bead (matches handleSuccess behavior) + // Update and close the MR bead if mr.ID != "" { // Fetch the MR bead to update its fields mrBead, err := e.beads.Show(mr.ID) @@ -572,24 +576,14 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) } } - // 3. Remove MR from queue (ephemeral - just delete the file) - if err := e.mrQueue.Remove(mr.ID); err != nil { - _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to remove MR from queue: %v\n", err) - } - - // 4. Log success + // 3. Log success _, _ = fmt.Fprintf(e.output, "[Engineer] ✓ Merged: %s (commit: %s)\n", mr.ID, result.MergeCommit) } -// handleFailureFromQueue handles a failed merge from wisp queue. +// HandleMRInfoFailure handles a failed merge from MRInfo. // For conflicts, creates a resolution task and blocks the MR until resolved. // This enables non-blocking delegation: the queue continues to the next MR. -func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) { - // Emit merge_failed event - if err := e.eventLogger.LogMergeFailed(mr, result.Error); err != nil { - _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merge_failed event: %v\n", err) - } - +func (e *Engineer) HandleMRInfoFailure(mr *MRInfo, result ProcessResult) { // Notify Witness of the failure so polecat can be alerted // Determine failure type from result failureType := "build" @@ -608,13 +602,13 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) // If this was a conflict, create a conflict-resolution task for dispatch // and block the MR until the task is resolved (non-blocking delegation) if result.Conflict { - taskID, err := e.createConflictResolutionTask(mr, result) + taskID, err := e.createConflictResolutionTaskForMR(mr, result) if err != nil { _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to create conflict resolution task: %v\n", err) - } else { - // Block the MR on the conflict resolution task + } else if taskID != "" { + // Block the MR on the conflict resolution task using beads dependency // When the task closes, the MR unblocks and re-enters the ready queue - if err := e.mrQueue.SetBlockedBy(mr.ID, taskID); err != nil { + if err := e.beads.AddDependency(mr.ID, taskID); err != nil { _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to block MR on task: %v\n", err) } else { _, _ = fmt.Fprintf(e.output, "[Engineer] MR %s blocked on conflict task %s (non-blocking delegation)\n", mr.ID, taskID) @@ -631,7 +625,7 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) } } -// createConflictResolutionTask creates a dispatchable task for resolving merge conflicts. +// createConflictResolutionTaskForMR creates a dispatchable task for resolving merge conflicts. // This task will be picked up by bd ready and can be slung to a fresh polecat (spawned on demand). // Returns the created task's ID for blocking the MR until resolution. // @@ -647,7 +641,7 @@ func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) // This serializes conflict resolution - only one polecat can resolve conflicts at a time. // If the slot is already held, we skip creating the task and let the MR stay in queue. // When the current resolution completes and merges, the slot is released. -func (e *Engineer) createConflictResolutionTask(mr *mrqueue.MR, _ ProcessResult) (string, error) { // result unused but kept for future merge diagnostics +func (e *Engineer) createConflictResolutionTaskForMR(mr *MRInfo, _ ProcessResult) (string, error) { // result unused but kept for future merge diagnostics // === MERGE SLOT GATE: Serialize conflict resolution === // Ensure merge slot exists (idempotent) slotID, err := e.beads.MergeSlotEnsureExists() @@ -743,14 +737,11 @@ The Refinery will automatically retry the merge after you force-push.`, _, _ = fmt.Fprintf(e.output, "[Engineer] Created conflict resolution task: %s (P%d)\n", task.ID, task.Priority) - // Update the MR's retry count for priority scoring - mr.RetryCount = retryCount - return task.ID, nil } // IsBeadOpen checks if a bead is still open (not closed). -// This is used as a status checker for mrqueue.ListReady to filter blocked MRs. +// This is used as a status checker to filter blocked MRs. func (e *Engineer) IsBeadOpen(beadID string) (bool, error) { issue, err := e.beads.Show(beadID) if err != nil { @@ -762,15 +753,172 @@ func (e *Engineer) IsBeadOpen(beadID string) (bool, error) { } // ListReadyMRs returns MRs that are ready for processing: -// - Not claimed by another worker (or claim is stale) -// - Not blocked by an open task -// Sorted by priority score (highest first). -func (e *Engineer) ListReadyMRs() ([]*mrqueue.MR, error) { - return e.mrQueue.ListReady(e.IsBeadOpen) +// - Not claimed by another worker (checked via assignee field) +// - Not blocked by an open task (handled by bd ready) +// Sorted by priority (highest first). +// +// This queries beads for merge-request wisps. +func (e *Engineer) ListReadyMRs() ([]*MRInfo, error) { + // Query beads for ready merge-request issues + issues, err := e.beads.ReadyWithType("merge-request") + if err != nil { + return nil, fmt.Errorf("querying beads for merge-requests: %w", err) + } + + // Convert beads issues to MRInfo + var mrs []*MRInfo + for _, issue := range issues { + fields := beads.ParseMRFields(issue) + if fields == nil { + continue // Skip issues without MR fields + } + + // Skip if already assigned (claimed by another worker) + if issue.Assignee != "" { + // TODO: Add stale claim detection based on updated_at + continue + } + + // Parse convoy created_at if present + var convoyCreatedAt *time.Time + if fields.ConvoyCreatedAt != "" { + if t, err := time.Parse(time.RFC3339, fields.ConvoyCreatedAt); err == nil { + convoyCreatedAt = &t + } + } + + // Parse issue created_at + var createdAt time.Time + if issue.CreatedAt != "" { + if t, err := time.Parse(time.RFC3339, issue.CreatedAt); err == nil { + createdAt = t + } + } + + mr := &MRInfo{ + ID: issue.ID, + Branch: fields.Branch, + Target: fields.Target, + SourceIssue: fields.SourceIssue, + Worker: fields.Worker, + Rig: fields.Rig, + Title: issue.Title, + Priority: issue.Priority, + AgentBead: fields.AgentBead, + RetryCount: fields.RetryCount, + ConvoyID: fields.ConvoyID, + ConvoyCreatedAt: convoyCreatedAt, + CreatedAt: createdAt, + } + mrs = append(mrs, mr) + } + + return mrs, nil } // ListBlockedMRs returns MRs that are blocked by open tasks. // Useful for monitoring/reporting. -func (e *Engineer) ListBlockedMRs() ([]*mrqueue.MR, error) { - return e.mrQueue.ListBlocked(e.IsBeadOpen) +// +// This queries beads for blocked merge-request issues. +func (e *Engineer) ListBlockedMRs() ([]*MRInfo, error) { + // Query all merge-request issues (both ready and blocked) + issues, err := e.beads.List(beads.ListOptions{ + Status: "open", + Label: "gt:merge-request", + Priority: -1, // No priority filter + }) + if err != nil { + return nil, fmt.Errorf("querying beads for merge-requests: %w", err) + } + + // Filter for blocked issues (those with open blockers) + var mrs []*MRInfo + for _, issue := range issues { + // Skip if not blocked + if len(issue.BlockedBy) == 0 { + continue + } + + // Check if any blocker is still open + hasOpenBlocker := false + for _, blockerID := range issue.BlockedBy { + isOpen, err := e.IsBeadOpen(blockerID) + if err == nil && isOpen { + hasOpenBlocker = true + break + } + } + if !hasOpenBlocker { + continue // All blockers are closed, not blocked + } + + fields := beads.ParseMRFields(issue) + if fields == nil { + continue + } + + // Parse convoy created_at if present + var convoyCreatedAt *time.Time + if fields.ConvoyCreatedAt != "" { + if t, err := time.Parse(time.RFC3339, fields.ConvoyCreatedAt); err == nil { + convoyCreatedAt = &t + } + } + + // Parse issue created_at + var createdAt time.Time + if issue.CreatedAt != "" { + if t, err := time.Parse(time.RFC3339, issue.CreatedAt); err == nil { + createdAt = t + } + } + + // Use the first open blocker as BlockedBy + blockedBy := "" + for _, blockerID := range issue.BlockedBy { + isOpen, err := e.IsBeadOpen(blockerID) + if err == nil && isOpen { + blockedBy = blockerID + break + } + } + + mr := &MRInfo{ + ID: issue.ID, + Branch: fields.Branch, + Target: fields.Target, + SourceIssue: fields.SourceIssue, + Worker: fields.Worker, + Rig: fields.Rig, + Title: issue.Title, + Priority: issue.Priority, + AgentBead: fields.AgentBead, + RetryCount: fields.RetryCount, + ConvoyID: fields.ConvoyID, + ConvoyCreatedAt: convoyCreatedAt, + CreatedAt: createdAt, + BlockedBy: blockedBy, + } + mrs = append(mrs, mr) + } + + return mrs, nil +} + +// ClaimMR claims an MR for processing by setting the assignee field. +// This replaces mrqueue.Claim() for beads-based MRs. +// The workerID is typically the refinery's identifier (e.g., "gastown/refinery"). +func (e *Engineer) ClaimMR(mrID, workerID string) error { + return e.beads.Update(mrID, beads.UpdateOptions{ + Assignee: &workerID, + }) +} + +// ReleaseMR releases a claimed MR back to the queue by clearing the assignee. +// This replaces mrqueue.Release() for beads-based MRs. +func (e *Engineer) ReleaseMR(mrID string) error { + empty := "" + return e.beads.Update(mrID, beads.UpdateOptions{ + Assignee: &empty, + }) } diff --git a/internal/refinery/manager.go b/internal/refinery/manager.go index 4ff6f647..980926e6 100644 --- a/internal/refinery/manager.go +++ b/internal/refinery/manager.go @@ -16,7 +16,6 @@ import ( "github.com/steveyegge/gastown/internal/constants" "github.com/steveyegge/gastown/internal/events" "github.com/steveyegge/gastown/internal/mail" - "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/rig" "github.com/steveyegge/gastown/internal/runtime" "github.com/steveyegge/gastown/internal/session" @@ -358,7 +357,7 @@ func (m *Manager) calculateIssueScore(issue *beads.Issue, now time.Time) float64 } // Build score input - input := mrqueue.ScoreInput{ + input := ScoreInput{ Priority: issue.Priority, MRCreatedAt: mrCreatedAt, Now: now, @@ -376,7 +375,7 @@ func (m *Manager) calculateIssueScore(issue *beads.Issue, now time.Time) float64 } } - return mrqueue.ScoreMRWithDefaults(input) + return ScoreMRWithDefaults(input) } // issueToMR converts a beads issue to a MergeRequest. diff --git a/internal/mrqueue/priority.go b/internal/refinery/score.go similarity index 62% rename from internal/mrqueue/priority.go rename to internal/refinery/score.go index 9bed18da..0cfbfc21 100644 --- a/internal/mrqueue/priority.go +++ b/internal/refinery/score.go @@ -1,57 +1,7 @@ -// Package mrqueue provides merge request queue storage and priority scoring. -// -// # MQ Priority Objective Function -// -// The merge queue uses a priority scoring function to determine processing order. -// Higher scores mean higher priority (process first). -// -// ## Scoring Formula -// -// score = BaseScore -// + ConvoyAgeWeight * hoursOld(convoy) // Prevent starvation -// + PriorityWeight * (4 - priority) // P0 > P4 -// - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing -// + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker -// -// ## Default Weights -// -// BaseScore: 1000.0 (keeps all scores positive) -// ConvoyAgeWeight: 10.0 (10 pts/hour = 240 pts/day) -// PriorityWeight: 100.0 (P0=+400, P4=+0) -// RetryPenalty: 50.0 (each retry loses 50 pts) -// MRAgeWeight: 1.0 (1 pt/hour, minor FIFO factor) -// MaxRetryPenalty: 300.0 (caps at 6 retries worth) -// -// ## Design Principles -// -// 1. Deterministic: same inputs always produce same score (uses explicit Now param) -// -// 2. Convoy Starvation Prevention: older convoys escalate in priority. A 48-hour -// old P4 convoy will beat a fresh P0 standalone issue (+480 vs +400). -// -// 3. Priority Respect: within similar convoy ages, P0 issues beat P4 issues. -// -// 4. Thrashing Prevention: MRs that repeatedly fail with conflicts get -// deprioritized, giving the repo state time to stabilize. -// -// 5. FIFO Fairness: within same convoy/priority/retry state, older MRs go first. -// -// ## Example Scores -// -// Fresh P0, no convoy: 1400 (1000 + 400) -// Fresh P4, no convoy: 1000 (1000 + 0) -// Fresh P2, 24h convoy: 1440 (1000 + 200 + 240) -// Fresh P4, 48h convoy: 1480 (1000 + 0 + 480) -// P2, 24h convoy, 3 retries: 1290 (1000 + 200 + 240 - 150) -// P0, no convoy, 6+ retries (capped): 1100 (1000 + 400 - 300) -// -// ## Tuning -// -// All weights are configurable via ScoreConfig. The defaults are designed so: -// - A 48-hour convoy beats any standalone priority (starvation prevention) -// - Priority differences dominate within same convoy -// - Retry penalty is significant but capped (eventual progress guaranteed) -package mrqueue +// Package refinery provides the merge queue processing agent. +// This file contains priority scoring logic for merge requests. + +package refinery import ( "time" @@ -134,13 +84,6 @@ type ScoreInput struct { // + PriorityWeight * (4 - priority) // P0=+400, P4=+0 // - min(RetryPenalty * retryCount, MaxRetryPenalty) // Prevent thrashing // + MRAgeWeight * hoursOld(MR) // FIFO tiebreaker -// -// Design principles: -// - Deterministic: same inputs always produce same score -// - Convoy starvation prevention: older convoys escalate in priority -// - Priority respect: P0 bugs beat P4 backlog items -// - Thrashing prevention: repeated failures get deprioritized -// - FIFO fairness: within same convoy/priority, older MRs go first func ScoreMR(input ScoreInput, config ScoreConfig) float64 { now := input.Now if now.IsZero() { @@ -192,12 +135,12 @@ func ScoreMRWithDefaults(input ScoreInput) float64 { // Score calculates the priority score for this MR using default config. // Higher scores mean higher priority (process first). -func (mr *MR) Score() float64 { +func (mr *MRInfo) Score() float64 { return mr.ScoreAt(time.Now()) } // ScoreAt calculates the priority score at a specific time (for deterministic testing). -func (mr *MR) ScoreAt(now time.Time) float64 { +func (mr *MRInfo) ScoreAt(now time.Time) float64 { input := ScoreInput{ Priority: mr.Priority, MRCreatedAt: mr.CreatedAt, diff --git a/internal/tui/feed/mq_source.go b/internal/tui/feed/mq_source.go index c63fb52d..05851e80 100644 --- a/internal/tui/feed/mq_source.go +++ b/internal/tui/feed/mq_source.go @@ -1,102 +1,42 @@ package feed import ( - "bufio" "context" - "encoding/json" - "os" - "path/filepath" - "strings" - "time" - - "github.com/steveyegge/gastown/internal/mrqueue" ) -// MQEventSource reads MQ lifecycle events from mq_events.jsonl +// MQEventSource was used to read MQ lifecycle events from mq_events.jsonl. +// The mrqueue package has been removed, so this is now a no-op stub. +// MR events can be observed via beads activity instead. type MQEventSource struct { - file *os.File - events chan Event - cancel context.CancelFunc - logPath string + events chan Event + cancel context.CancelFunc } -// NewMQEventSource creates a source that tails MQ events from a beads directory. +// NewMQEventSource creates a stub source that produces no events. +// The mrqueue event log is no longer written. func NewMQEventSource(beadsDir string) (*MQEventSource, error) { - logPath := filepath.Join(beadsDir, "mq_events.jsonl") - - // Create file if it doesn't exist - if _, err := os.Stat(logPath); os.IsNotExist(err) { - // Ensure directory exists - if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil { - return nil, err - } - // Create empty file - f, err := os.Create(logPath) - if err != nil { - return nil, err - } - _ = f.Close() //nolint:gosec // G104: best-effort close on file creation - } - - file, err := os.Open(logPath) - if err != nil { - return nil, err - } - ctx, cancel := context.WithCancel(context.Background()) source := &MQEventSource{ - file: file, - events: make(chan Event, 100), - cancel: cancel, - logPath: logPath, + events: make(chan Event, 1), + cancel: cancel, } - go source.tail(ctx) + // Start a goroutine that just waits for cancellation + go func() { + <-ctx.Done() + close(source.events) + }() return source, nil } -// NewMQEventSourceFromWorkDir creates an MQ event source by finding the beads directory. +// NewMQEventSourceFromWorkDir creates an MQ event source (stub). func NewMQEventSourceFromWorkDir(workDir string) (*MQEventSource, error) { - beadsDir, err := FindBeadsDir(workDir) - if err != nil { - return nil, err - } - return NewMQEventSource(beadsDir) + return NewMQEventSource("") } -// tail follows the MQ event log file and sends events. -func (s *MQEventSource) tail(ctx context.Context) { - defer close(s.events) - - // Seek to end for live tailing - _, _ = s.file.Seek(0, 2) - - scanner := bufio.NewScanner(s.file) - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - for scanner.Scan() { - line := scanner.Text() - if event := parseMQEventLine(line); event != nil { - select { - case s.events <- *event: - default: - // Drop event if channel full - } - } - } - } - } -} - -// Events returns the event channel. +// Events returns the event channel (always empty). func (s *MQEventSource) Events() <-chan Event { return s.events } @@ -104,86 +44,5 @@ func (s *MQEventSource) Events() <-chan Event { // Close stops the source. func (s *MQEventSource) Close() error { s.cancel() - return s.file.Close() -} - -// parseMQEventLine parses a line from mq_events.jsonl into a feed Event. -func parseMQEventLine(line string) *Event { - if strings.TrimSpace(line) == "" { - return nil - } - - var mqEvent mrqueue.Event - if err := json.Unmarshal([]byte(line), &mqEvent); err != nil { - return nil - } - - // Convert MQ event to feed Event - feedType := mapMQEventType(mqEvent.Type) - message := formatMQEventMessage(mqEvent) - - return &Event{ - Time: mqEvent.Timestamp, - Type: feedType, - Actor: "refinery", - Target: mqEvent.MRID, - Message: message, - Rig: mqEvent.Rig, - Role: "refinery", - Raw: line, - } -} - -// mapMQEventType maps MQ event types to feed event types. -func mapMQEventType(mqType mrqueue.EventType) string { - switch mqType { - case mrqueue.EventMergeStarted: - return "merge_started" - case mrqueue.EventMerged: - return "merged" - case mrqueue.EventMergeFailed: - return "merge_failed" - case mrqueue.EventMergeSkipped: - return "merge_skipped" - default: - return string(mqType) - } -} - -// formatMQEventMessage creates a human-readable message for an MQ event. -func formatMQEventMessage(e mrqueue.Event) string { - branchInfo := e.Branch - if e.Target != "" { - branchInfo += " -> " + e.Target - } - - switch e.Type { - case mrqueue.EventMergeStarted: - return "Merge started: " + branchInfo - case mrqueue.EventMerged: - msg := "Merged: " + branchInfo - if e.MergeCommit != "" { - // Show short commit SHA - sha := e.MergeCommit - if len(sha) > 8 { - sha = sha[:8] - } - msg += " (" + sha + ")" - } - return msg - case mrqueue.EventMergeFailed: - msg := "Merge failed: " + branchInfo - if e.Reason != "" { - msg += " - " + e.Reason - } - return msg - case mrqueue.EventMergeSkipped: - msg := "Merge skipped: " + branchInfo - if e.Reason != "" { - msg += " - " + e.Reason - } - return msg - default: - return string(e.Type) + ": " + branchInfo - } + return nil } diff --git a/internal/tui/feed/mq_source_test.go b/internal/tui/feed/mq_source_test.go deleted file mode 100644 index b1b2358b..00000000 --- a/internal/tui/feed/mq_source_test.go +++ /dev/null @@ -1,144 +0,0 @@ -package feed - -import ( - "encoding/json" - "testing" - "time" - - "github.com/steveyegge/gastown/internal/mrqueue" -) - -func TestParseMQEventLine(t *testing.T) { - tests := []struct { - name string - event mrqueue.Event - wantType string - wantTarget string - wantContains string // Substring in message - }{ - { - name: "merge_started", - event: mrqueue.Event{ - Timestamp: time.Now(), - Type: mrqueue.EventMergeStarted, - MRID: "mr-123", - Branch: "polecat/nux", - Target: "main", - Worker: "nux", - Rig: "gastown", - }, - wantType: "merge_started", - wantTarget: "mr-123", - wantContains: "Merge started", - }, - { - name: "merged", - event: mrqueue.Event{ - Timestamp: time.Now(), - Type: mrqueue.EventMerged, - MRID: "mr-456", - Branch: "polecat/toast", - Target: "main", - Worker: "toast", - Rig: "gastown", - MergeCommit: "abc123def456789", - }, - wantType: "merged", - wantTarget: "mr-456", - wantContains: "abc123de", // Short SHA - }, - { - name: "merge_failed", - event: mrqueue.Event{ - Timestamp: time.Now(), - Type: mrqueue.EventMergeFailed, - MRID: "mr-789", - Branch: "polecat/capable", - Target: "main", - Worker: "capable", - Rig: "gastown", - Reason: "conflict in main.go", - }, - wantType: "merge_failed", - wantTarget: "mr-789", - wantContains: "conflict in main.go", - }, - { - name: "merge_skipped", - event: mrqueue.Event{ - Timestamp: time.Now(), - Type: mrqueue.EventMergeSkipped, - MRID: "mr-999", - Branch: "polecat/skip", - Target: "main", - Reason: "already merged", - }, - wantType: "merge_skipped", - wantTarget: "mr-999", - wantContains: "already merged", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Marshal to JSON line - data, err := json.Marshal(tt.event) - if err != nil { - t.Fatalf("Failed to marshal event: %v", err) - } - - // Parse the line - result := parseMQEventLine(string(data)) - if result == nil { - t.Fatal("parseMQEventLine returned nil") - } - - if result.Type != tt.wantType { - t.Errorf("Type = %q, want %q", result.Type, tt.wantType) - } - - if result.Target != tt.wantTarget { - t.Errorf("Target = %q, want %q", result.Target, tt.wantTarget) - } - - if tt.wantContains != "" && !contains(result.Message, tt.wantContains) { - t.Errorf("Message = %q, want to contain %q", result.Message, tt.wantContains) - } - - // Actor should be refinery - if result.Actor != "refinery" { - t.Errorf("Actor = %q, want %q", result.Actor, "refinery") - } - - if result.Role != "refinery" { - t.Errorf("Role = %q, want %q", result.Role, "refinery") - } - }) - } -} - -func TestParseMQEventLineEmpty(t *testing.T) { - result := parseMQEventLine("") - if result != nil { - t.Error("Expected nil for empty line") - } - - result = parseMQEventLine(" ") - if result != nil { - t.Error("Expected nil for whitespace-only line") - } - - result = parseMQEventLine("not valid json") - if result != nil { - t.Error("Expected nil for invalid JSON") - } -} - -func contains(s, substr string) bool { - for i := 0; i <= len(s)-len(substr); i++ { - if s[i:i+len(substr)] == substr { - return true - } - } - return false -}