Remove stubbed ProcessMRFromQueue and mrqueue package (gt-u4fh)
The mrqueue package was aspirational 'beads-based MR automation' that was never completed. The Refinery agent is prompt-driven and uses beads/mail for coordination, not a Go-based polling queue. Removed: - internal/mrqueue/mrqueue.go (entire package) - internal/cmd/mq_migrate.go (migration command for mrqueue) - mrqueue submission from done.go and mq_submit.go - Engineer.ProcessMRFromQueue() and related queue handlers - Engineer.Run(), Stop(), processOnce() methods - mrQueue field and stopCh from Engineer struct - stopCh assertion from TestNewEngineer Kept: - Bead creation for merge-requests (audit record) - Engineer struct and NewEngineer for potential future use - Engineer.ProcessMR() (works with beads.Issue) - Manager.ProcessMR() which is the working implementation 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -10,7 +10,6 @@ import (
|
|||||||
"github.com/steveyegge/gastown/internal/events"
|
"github.com/steveyegge/gastown/internal/events"
|
||||||
"github.com/steveyegge/gastown/internal/git"
|
"github.com/steveyegge/gastown/internal/git"
|
||||||
"github.com/steveyegge/gastown/internal/mail"
|
"github.com/steveyegge/gastown/internal/mail"
|
||||||
"github.com/steveyegge/gastown/internal/mrqueue"
|
|
||||||
"github.com/steveyegge/gastown/internal/style"
|
"github.com/steveyegge/gastown/internal/style"
|
||||||
"github.com/steveyegge/gastown/internal/workspace"
|
"github.com/steveyegge/gastown/internal/workspace"
|
||||||
)
|
)
|
||||||
@@ -164,28 +163,6 @@ func runDone(cmd *cobra.Command, args []string) error {
|
|||||||
}
|
}
|
||||||
mrID = mrIssue.ID
|
mrID = mrIssue.ID
|
||||||
|
|
||||||
// Also submit to mrqueue so refinery can process it
|
|
||||||
// The mrqueue is the work queue the refinery polls; beads are the audit record
|
|
||||||
mq, err := mrqueue.NewFromWorkdir(cwd)
|
|
||||||
if err != nil {
|
|
||||||
// Non-fatal: bead was created, just warn about queue
|
|
||||||
style.PrintWarning("could not access merge queue: %v", err)
|
|
||||||
} else {
|
|
||||||
mqEntry := &mrqueue.MR{
|
|
||||||
ID: mrID,
|
|
||||||
Branch: branch,
|
|
||||||
Target: target,
|
|
||||||
SourceIssue: issueID,
|
|
||||||
Worker: worker,
|
|
||||||
Rig: rigName,
|
|
||||||
Title: title,
|
|
||||||
Priority: priority,
|
|
||||||
}
|
|
||||||
if err := mq.Submit(mqEntry); err != nil {
|
|
||||||
style.PrintWarning("could not submit to merge queue: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Success output
|
// Success output
|
||||||
fmt.Printf("%s Work submitted to merge queue\n", style.Bold.Render("✓"))
|
fmt.Printf("%s Work submitted to merge queue\n", style.Bold.Render("✓"))
|
||||||
fmt.Printf(" MR ID: %s\n", style.Bold.Render(mrID))
|
fmt.Printf(" MR ID: %s\n", style.Bold.Render(mrID))
|
||||||
|
|||||||
@@ -1,175 +0,0 @@
|
|||||||
package cmd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
"github.com/steveyegge/gastown/internal/beads"
|
|
||||||
"github.com/steveyegge/gastown/internal/mrqueue"
|
|
||||||
"github.com/steveyegge/gastown/internal/style"
|
|
||||||
"github.com/steveyegge/gastown/internal/workspace"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
mqMigrateDryRun bool
|
|
||||||
)
|
|
||||||
|
|
||||||
var mqMigrateCmd = &cobra.Command{
|
|
||||||
Use: "migrate",
|
|
||||||
Short: "Migrate stale merge-request beads to the mrqueue",
|
|
||||||
Long: `Migrate existing merge-request beads to the mrqueue.
|
|
||||||
|
|
||||||
This command finds merge-request beads that were created before the mrqueue
|
|
||||||
integration and adds them to the refinery's work queue (.beads/mq/).
|
|
||||||
|
|
||||||
Use this to recover stale MRs that the refinery wasn't processing because
|
|
||||||
they only existed as beads.
|
|
||||||
|
|
||||||
Examples:
|
|
||||||
gt mq migrate # Migrate all stale MRs
|
|
||||||
gt mq migrate --dry-run # Preview what would be migrated`,
|
|
||||||
RunE: runMqMigrate,
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
mqMigrateCmd.Flags().BoolVar(&mqMigrateDryRun, "dry-run", false, "Preview only, don't actually migrate")
|
|
||||||
mqCmd.AddCommand(mqMigrateCmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func runMqMigrate(cmd *cobra.Command, args []string) error {
|
|
||||||
// Find workspace
|
|
||||||
townRoot, err := workspace.FindFromCwdOrError()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("not in a Gas Town workspace: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find current rig
|
|
||||||
rigName, _, err := findCurrentRig(townRoot)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize beads
|
|
||||||
cwd, err := os.Getwd()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("getting current directory: %w", err)
|
|
||||||
}
|
|
||||||
bd := beads.New(cwd)
|
|
||||||
|
|
||||||
// Initialize mrqueue
|
|
||||||
mq, err := mrqueue.NewFromWorkdir(cwd)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("accessing merge queue: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get existing mrqueue entries to avoid duplicates
|
|
||||||
existingMRs, err := mq.List()
|
|
||||||
if err != nil && !os.IsNotExist(err) {
|
|
||||||
return fmt.Errorf("listing existing queue: %w", err)
|
|
||||||
}
|
|
||||||
existingIDs := make(map[string]bool)
|
|
||||||
for _, mr := range existingMRs {
|
|
||||||
existingIDs[mr.ID] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// List all open merge-request beads
|
|
||||||
// Note: beads.List() with default ListOptions filters for P0 only (Priority default is 0)
|
|
||||||
// So we use Priority: -1 to indicate no filter
|
|
||||||
allIssues, err := bd.List(beads.ListOptions{Priority: -1})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("listing all beads: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter for open merge-requests
|
|
||||||
var issues []*beads.Issue
|
|
||||||
for _, issue := range allIssues {
|
|
||||||
if issue.Type == "merge-request" && issue.Status == "open" {
|
|
||||||
issues = append(issues, issue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(issues) == 0 {
|
|
||||||
fmt.Println("No stale merge-request beads found.")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter to only those not already in mrqueue
|
|
||||||
var toMigrate []*beads.Issue
|
|
||||||
for _, issue := range issues {
|
|
||||||
if !existingIDs[issue.ID] {
|
|
||||||
toMigrate = append(toMigrate, issue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(toMigrate) == 0 {
|
|
||||||
fmt.Println("All merge-request beads already in mrqueue.")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Found %d stale merge-request bead(s) to migrate:\n\n", len(toMigrate))
|
|
||||||
|
|
||||||
migrated := 0
|
|
||||||
for _, issue := range toMigrate {
|
|
||||||
// Parse MR fields from bead description
|
|
||||||
mrFields := beads.ParseMRFields(issue)
|
|
||||||
if mrFields == nil {
|
|
||||||
fmt.Printf(" %s %s - skipping (no MR fields in description)\n",
|
|
||||||
style.Dim.Render("⚠"), issue.ID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract worker from description
|
|
||||||
worker := mrFields.Worker
|
|
||||||
if worker == "" {
|
|
||||||
// Try to extract from branch name
|
|
||||||
if strings.HasPrefix(mrFields.Branch, "polecat/") {
|
|
||||||
parts := strings.SplitN(mrFields.Branch, "/", 3)
|
|
||||||
if len(parts) >= 2 {
|
|
||||||
worker = parts[1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if mqMigrateDryRun {
|
|
||||||
fmt.Printf(" %s %s - %s (branch: %s, target: %s)\n",
|
|
||||||
style.Bold.Render("→"), issue.ID, issue.Title,
|
|
||||||
mrFields.Branch, mrFields.Target)
|
|
||||||
} else {
|
|
||||||
// Create mrqueue entry
|
|
||||||
mqEntry := &mrqueue.MR{
|
|
||||||
ID: issue.ID,
|
|
||||||
Branch: mrFields.Branch,
|
|
||||||
Target: mrFields.Target,
|
|
||||||
SourceIssue: mrFields.SourceIssue,
|
|
||||||
Worker: worker,
|
|
||||||
Rig: rigName,
|
|
||||||
Title: issue.Title,
|
|
||||||
Priority: issue.Priority,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := mq.Submit(mqEntry); err != nil {
|
|
||||||
fmt.Printf(" %s %s - failed: %v\n",
|
|
||||||
style.Dim.Render("✗"), issue.ID, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf(" %s %s - %s\n",
|
|
||||||
style.Bold.Render("✓"), issue.ID, issue.Title)
|
|
||||||
migrated++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println()
|
|
||||||
if mqMigrateDryRun {
|
|
||||||
fmt.Printf("Dry run: would migrate %d merge-request(s)\n", len(toMigrate))
|
|
||||||
fmt.Println("Run without --dry-run to perform migration.")
|
|
||||||
} else {
|
|
||||||
fmt.Printf("%s Migrated %d merge-request(s) to mrqueue\n",
|
|
||||||
style.Bold.Render("✓"), migrated)
|
|
||||||
fmt.Println("The refinery will process them on its next poll cycle.")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/steveyegge/gastown/internal/beads"
|
"github.com/steveyegge/gastown/internal/beads"
|
||||||
"github.com/steveyegge/gastown/internal/git"
|
"github.com/steveyegge/gastown/internal/git"
|
||||||
"github.com/steveyegge/gastown/internal/mrqueue"
|
|
||||||
"github.com/steveyegge/gastown/internal/style"
|
"github.com/steveyegge/gastown/internal/style"
|
||||||
"github.com/steveyegge/gastown/internal/workspace"
|
"github.com/steveyegge/gastown/internal/workspace"
|
||||||
)
|
)
|
||||||
@@ -150,28 +149,6 @@ func runMqSubmit(cmd *cobra.Command, args []string) error {
|
|||||||
return fmt.Errorf("creating merge request bead: %w", err)
|
return fmt.Errorf("creating merge request bead: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also submit to mrqueue so refinery can process it
|
|
||||||
// The mrqueue is the work queue the refinery polls; beads are the audit record
|
|
||||||
mq, err := mrqueue.NewFromWorkdir(cwd)
|
|
||||||
if err != nil {
|
|
||||||
// Non-fatal: bead was created, just warn about queue
|
|
||||||
style.PrintWarning("could not access merge queue: %v", err)
|
|
||||||
} else {
|
|
||||||
mqEntry := &mrqueue.MR{
|
|
||||||
ID: mrIssue.ID,
|
|
||||||
Branch: branch,
|
|
||||||
Target: target,
|
|
||||||
SourceIssue: issueID,
|
|
||||||
Worker: worker,
|
|
||||||
Rig: rigName,
|
|
||||||
Title: title,
|
|
||||||
Priority: priority,
|
|
||||||
}
|
|
||||||
if err := mq.Submit(mqEntry); err != nil {
|
|
||||||
style.PrintWarning("could not submit to merge queue: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Success output
|
// Success output
|
||||||
fmt.Printf("%s Submitted to merge queue\n", style.Bold.Render("✓"))
|
fmt.Printf("%s Submitted to merge queue\n", style.Bold.Render("✓"))
|
||||||
fmt.Printf(" MR ID: %s\n", style.Bold.Render(mrIssue.ID))
|
fmt.Printf(" MR ID: %s\n", style.Bold.Render(mrIssue.ID))
|
||||||
|
|||||||
@@ -1,214 +0,0 @@
|
|||||||
// Package mrqueue provides merge request queue storage.
|
|
||||||
// MRs are stored locally in .beads/mq/ and deleted after merge.
|
|
||||||
// This avoids sync overhead for transient MR state.
|
|
||||||
package mrqueue
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/rand"
|
|
||||||
"encoding/hex"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// MR represents a merge request in the queue.
|
|
||||||
type MR struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Branch string `json:"branch"` // Source branch (e.g., "polecat/nux")
|
|
||||||
Target string `json:"target"` // Target branch (e.g., "main")
|
|
||||||
SourceIssue string `json:"source_issue"` // The work item being merged
|
|
||||||
Worker string `json:"worker"` // Who did the work
|
|
||||||
Rig string `json:"rig"` // Which rig
|
|
||||||
Title string `json:"title"` // MR title
|
|
||||||
Priority int `json:"priority"` // Priority (lower = higher priority)
|
|
||||||
CreatedAt time.Time `json:"created_at"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue manages the MR storage.
|
|
||||||
type Queue struct {
|
|
||||||
dir string // .beads/mq/ directory
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a new MR queue for the given rig path.
|
|
||||||
func New(rigPath string) *Queue {
|
|
||||||
return &Queue{
|
|
||||||
dir: filepath.Join(rigPath, ".beads", "mq"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewFromWorkdir creates a queue by finding the rig root from a working directory.
|
|
||||||
// It follows beads redirects to ensure all clones use the same shared mrqueue.
|
|
||||||
func NewFromWorkdir(workdir string) (*Queue, error) {
|
|
||||||
// Walk up to find .beads or rig root
|
|
||||||
dir := workdir
|
|
||||||
for {
|
|
||||||
beadsDir := filepath.Join(dir, ".beads")
|
|
||||||
if info, err := os.Stat(beadsDir); err == nil && info.IsDir() {
|
|
||||||
// Check for redirect and follow it
|
|
||||||
finalDir := resolveBeadsRedirect(beadsDir)
|
|
||||||
return &Queue{dir: filepath.Join(finalDir, "mq")}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
parent := filepath.Dir(dir)
|
|
||||||
if parent == dir {
|
|
||||||
return nil, fmt.Errorf("could not find .beads directory from %s", workdir)
|
|
||||||
}
|
|
||||||
dir = parent
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// resolveBeadsRedirect follows beads redirect files to find the final directory.
|
|
||||||
// Returns the original dir if no redirect or on error.
|
|
||||||
func resolveBeadsRedirect(beadsDir string) string {
|
|
||||||
redirectPath := filepath.Join(beadsDir, "redirect")
|
|
||||||
data, err := os.ReadFile(redirectPath)
|
|
||||||
if err != nil {
|
|
||||||
return beadsDir // No redirect file
|
|
||||||
}
|
|
||||||
|
|
||||||
target := strings.TrimSpace(string(data))
|
|
||||||
if target == "" {
|
|
||||||
return beadsDir
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resolve relative path from beadsDir's parent
|
|
||||||
if !filepath.IsAbs(target) {
|
|
||||||
target = filepath.Join(filepath.Dir(beadsDir), target)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clean and verify the target exists
|
|
||||||
target = filepath.Clean(target)
|
|
||||||
if info, err := os.Stat(target); err == nil && info.IsDir() {
|
|
||||||
return target
|
|
||||||
}
|
|
||||||
|
|
||||||
return beadsDir // Target doesn't exist, use original
|
|
||||||
}
|
|
||||||
|
|
||||||
// EnsureDir creates the MQ directory if it doesn't exist.
|
|
||||||
func (q *Queue) EnsureDir() error {
|
|
||||||
return os.MkdirAll(q.dir, 0755)
|
|
||||||
}
|
|
||||||
|
|
||||||
// generateID creates a unique MR ID.
|
|
||||||
func generateID() string {
|
|
||||||
b := make([]byte, 4)
|
|
||||||
rand.Read(b)
|
|
||||||
return fmt.Sprintf("mr-%d-%s", time.Now().Unix(), hex.EncodeToString(b))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Submit adds a new MR to the queue.
|
|
||||||
func (q *Queue) Submit(mr *MR) error {
|
|
||||||
if err := q.EnsureDir(); err != nil {
|
|
||||||
return fmt.Errorf("creating mq directory: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if mr.ID == "" {
|
|
||||||
mr.ID = generateID()
|
|
||||||
}
|
|
||||||
if mr.CreatedAt.IsZero() {
|
|
||||||
mr.CreatedAt = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.MarshalIndent(mr, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("marshaling MR: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
path := filepath.Join(q.dir, mr.ID+".json")
|
|
||||||
if err := os.WriteFile(path, data, 0644); err != nil {
|
|
||||||
return fmt.Errorf("writing MR file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// List returns all pending MRs, sorted by priority then creation time.
|
|
||||||
func (q *Queue) List() ([]*MR, error) {
|
|
||||||
entries, err := os.ReadDir(q.dir)
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return nil, nil // Empty queue
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("reading mq directory: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var mrs []*MR
|
|
||||||
for _, entry := range entries {
|
|
||||||
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
mr, err := q.load(filepath.Join(q.dir, entry.Name()))
|
|
||||||
if err != nil {
|
|
||||||
continue // Skip malformed files
|
|
||||||
}
|
|
||||||
mrs = append(mrs, mr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sort by priority (lower first), then by creation time (older first)
|
|
||||||
sort.Slice(mrs, func(i, j int) bool {
|
|
||||||
if mrs[i].Priority != mrs[j].Priority {
|
|
||||||
return mrs[i].Priority < mrs[j].Priority
|
|
||||||
}
|
|
||||||
return mrs[i].CreatedAt.Before(mrs[j].CreatedAt)
|
|
||||||
})
|
|
||||||
|
|
||||||
return mrs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves a specific MR by ID.
|
|
||||||
func (q *Queue) Get(id string) (*MR, error) {
|
|
||||||
path := filepath.Join(q.dir, id+".json")
|
|
||||||
return q.load(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
// load reads an MR from a file path.
|
|
||||||
func (q *Queue) load(path string) (*MR, error) {
|
|
||||||
data, err := os.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var mr MR
|
|
||||||
if err := json.Unmarshal(data, &mr); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &mr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove deletes an MR from the queue (after successful merge).
|
|
||||||
func (q *Queue) Remove(id string) error {
|
|
||||||
path := filepath.Join(q.dir, id+".json")
|
|
||||||
err := os.Remove(path)
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
return nil // Already removed
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Count returns the number of pending MRs.
|
|
||||||
func (q *Queue) Count() int {
|
|
||||||
entries, err := os.ReadDir(q.dir)
|
|
||||||
if err != nil {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
count := 0
|
|
||||||
for _, entry := range entries {
|
|
||||||
if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".json") {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return count
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dir returns the queue directory path.
|
|
||||||
func (q *Queue) Dir() string {
|
|
||||||
return q.dir
|
|
||||||
}
|
|
||||||
@@ -11,9 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/steveyegge/gastown/internal/beads"
|
"github.com/steveyegge/gastown/internal/beads"
|
||||||
"github.com/steveyegge/gastown/internal/events"
|
|
||||||
"github.com/steveyegge/gastown/internal/git"
|
"github.com/steveyegge/gastown/internal/git"
|
||||||
"github.com/steveyegge/gastown/internal/mrqueue"
|
|
||||||
"github.com/steveyegge/gastown/internal/rig"
|
"github.com/steveyegge/gastown/internal/rig"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -71,14 +69,10 @@ func DefaultMergeQueueConfig() *MergeQueueConfig {
|
|||||||
type Engineer struct {
|
type Engineer struct {
|
||||||
rig *rig.Rig
|
rig *rig.Rig
|
||||||
beads *beads.Beads
|
beads *beads.Beads
|
||||||
mrQueue *mrqueue.Queue
|
|
||||||
git *git.Git
|
git *git.Git
|
||||||
config *MergeQueueConfig
|
config *MergeQueueConfig
|
||||||
workDir string
|
workDir string
|
||||||
output io.Writer // Output destination for user-facing messages
|
output io.Writer // Output destination for user-facing messages
|
||||||
|
|
||||||
// stopCh is used for graceful shutdown
|
|
||||||
stopCh chan struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEngineer creates a new Engineer for the given rig.
|
// NewEngineer creates a new Engineer for the given rig.
|
||||||
@@ -86,12 +80,10 @@ func NewEngineer(r *rig.Rig) *Engineer {
|
|||||||
return &Engineer{
|
return &Engineer{
|
||||||
rig: r,
|
rig: r,
|
||||||
beads: beads.New(r.Path),
|
beads: beads.New(r.Path),
|
||||||
mrQueue: mrqueue.New(r.Path),
|
|
||||||
git: git.NewGit(r.Path),
|
git: git.NewGit(r.Path),
|
||||||
config: DefaultMergeQueueConfig(),
|
config: DefaultMergeQueueConfig(),
|
||||||
workDir: r.Path,
|
workDir: r.Path,
|
||||||
output: os.Stdout,
|
output: os.Stdout,
|
||||||
stopCh: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -189,90 +181,6 @@ func (e *Engineer) Config() *MergeQueueConfig {
|
|||||||
return e.config
|
return e.config
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the Engineer main loop. It blocks until the context is cancelled
|
|
||||||
// or Stop() is called. Returns nil on graceful shutdown.
|
|
||||||
func (e *Engineer) Run(ctx context.Context) error {
|
|
||||||
if err := e.LoadConfig(); err != nil {
|
|
||||||
return fmt.Errorf("loading config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !e.config.Enabled {
|
|
||||||
return fmt.Errorf("merge queue is disabled in configuration")
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Starting for rig %s (poll_interval=%s)\n",
|
|
||||||
e.rig.Name, e.config.PollInterval)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(e.config.PollInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
// Run one iteration immediately, then on ticker
|
|
||||||
if err := e.processOnce(ctx); err != nil {
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Error: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
fmt.Fprintln(e.output, "[Engineer] Shutting down (context cancelled)")
|
|
||||||
return nil
|
|
||||||
case <-e.stopCh:
|
|
||||||
fmt.Fprintln(e.output, "[Engineer] Shutting down (stop signal)")
|
|
||||||
return nil
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := e.processOnce(ctx); err != nil {
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Error: %v\n", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop signals the Engineer to stop processing. This is a non-blocking call.
|
|
||||||
func (e *Engineer) Stop() {
|
|
||||||
close(e.stopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
// processOnce performs one iteration of the Engineer loop:
|
|
||||||
// 1. Query for ready merge-requests from wisp storage
|
|
||||||
// 2. If none, return (will try again on next tick)
|
|
||||||
// 3. Process the highest priority, oldest MR
|
|
||||||
func (e *Engineer) processOnce(ctx context.Context) error {
|
|
||||||
// Check context before starting
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. Query MR queue (wisp storage - already sorted by priority/age)
|
|
||||||
pendingMRs, err := e.mrQueue.List()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("querying merge queue: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. If empty, return
|
|
||||||
if len(pendingMRs) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Select highest priority, oldest MR (List already returns sorted)
|
|
||||||
mr := pendingMRs[0]
|
|
||||||
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Processing: %s (%s)\n", mr.ID, mr.Title)
|
|
||||||
|
|
||||||
// 4. Process MR
|
|
||||||
result := e.ProcessMRFromQueue(ctx, mr)
|
|
||||||
|
|
||||||
// 5. Handle result
|
|
||||||
if result.Success {
|
|
||||||
e.handleSuccessFromQueue(mr, result)
|
|
||||||
} else {
|
|
||||||
e.handleFailureFromQueue(mr, result)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessResult contains the result of processing a merge request.
|
// ProcessResult contains the result of processing a merge request.
|
||||||
type ProcessResult struct {
|
type ProcessResult struct {
|
||||||
Success bool
|
Success bool
|
||||||
@@ -372,72 +280,3 @@ func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) {
|
|||||||
|
|
||||||
// Full failure handling (assign back to worker, labels) in gt-3x1.4
|
// Full failure handling (assign back to worker, labels) in gt-3x1.4
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessMRFromQueue processes a merge request from wisp queue.
|
|
||||||
func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) ProcessResult {
|
|
||||||
// Emit merge_started event
|
|
||||||
actor := fmt.Sprintf("%s/refinery", e.rig.Name)
|
|
||||||
_ = events.LogFeed(events.TypeMergeStarted, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, ""))
|
|
||||||
|
|
||||||
// MR fields are directly on the struct (no parsing needed)
|
|
||||||
fmt.Fprintln(e.output, "[Engineer] Processing MR from queue:")
|
|
||||||
fmt.Fprintf(e.output, " Branch: %s\n", mr.Branch)
|
|
||||||
fmt.Fprintf(e.output, " Target: %s\n", mr.Target)
|
|
||||||
fmt.Fprintf(e.output, " Worker: %s\n", mr.Worker)
|
|
||||||
fmt.Fprintf(e.output, " Source: %s\n", mr.SourceIssue)
|
|
||||||
|
|
||||||
// TODO: Actual merge implementation
|
|
||||||
// For now, return failure - actual implementation in gt-3x1.2
|
|
||||||
return ProcessResult{
|
|
||||||
Success: false,
|
|
||||||
Error: "ProcessMRFromQueue not fully implemented (see gt-3x1.2)",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleSuccessFromQueue handles a successful merge from wisp queue.
|
|
||||||
func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) {
|
|
||||||
actor := fmt.Sprintf("%s/refinery", e.rig.Name)
|
|
||||||
|
|
||||||
// Emit merged event
|
|
||||||
_ = events.LogFeed(events.TypeMerged, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, ""))
|
|
||||||
|
|
||||||
// 1. Close source issue with reference to MR
|
|
||||||
if mr.SourceIssue != "" {
|
|
||||||
closeReason := fmt.Sprintf("Merged in %s", mr.ID)
|
|
||||||
if err := e.beads.CloseWithReason(closeReason, mr.SourceIssue); err != nil {
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mr.SourceIssue, err)
|
|
||||||
} else {
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mr.SourceIssue)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Delete source branch if configured (local only)
|
|
||||||
if e.config.DeleteMergedBranches && mr.Branch != "" {
|
|
||||||
if err := e.git.DeleteBranch(mr.Branch, true); err != nil {
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Warning: failed to delete branch %s: %v\n", mr.Branch, err)
|
|
||||||
} else {
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Deleted local branch: %s\n", mr.Branch)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Remove MR from queue (ephemeral - just delete the file)
|
|
||||||
if err := e.mrQueue.Remove(mr.ID); err != nil {
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] Warning: failed to remove MR from queue: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Log success
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] ✓ Merged: %s (commit: %s)\n", mr.ID, result.MergeCommit)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleFailureFromQueue handles a failed merge from wisp queue.
|
|
||||||
func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) {
|
|
||||||
actor := fmt.Sprintf("%s/refinery", e.rig.Name)
|
|
||||||
|
|
||||||
// Emit merge_failed event
|
|
||||||
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error))
|
|
||||||
|
|
||||||
// MR stays in queue for retry - no action needed on the file
|
|
||||||
// Log the failure
|
|
||||||
fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error)
|
|
||||||
fmt.Fprintln(e.output, "[Engineer] MR remains in queue for retry")
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -206,9 +206,6 @@ func TestNewEngineer(t *testing.T) {
|
|||||||
if e.config == nil {
|
if e.config == nil {
|
||||||
t.Error("expected config to be initialized with defaults")
|
t.Error("expected config to be initialized with defaults")
|
||||||
}
|
}
|
||||||
if e.stopCh == nil {
|
|
||||||
t.Error("expected stopCh to be initialized")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEngineer_DeleteMergedBranchesConfig(t *testing.T) {
|
func TestEngineer_DeleteMergedBranchesConfig(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user