Merge main, keeping main's manager.go and our FailureType tests

This commit is contained in:
Steve Yegge
2025-12-19 16:26:38 -08:00
82 changed files with 13666 additions and 1172 deletions
+321
View File
@@ -0,0 +1,321 @@
// Package refinery provides the merge queue processing agent.
package refinery
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/rig"
)
// MergeQueueConfig holds configuration for the merge queue processor.
type MergeQueueConfig struct {
// Enabled controls whether the merge queue is active.
Enabled bool `json:"enabled"`
// TargetBranch is the default branch to merge to (e.g., "main").
TargetBranch string `json:"target_branch"`
// IntegrationBranches enables per-epic integration branches.
IntegrationBranches bool `json:"integration_branches"`
// OnConflict is the strategy for handling conflicts: "assign_back" or "auto_rebase".
OnConflict string `json:"on_conflict"`
// RunTests controls whether to run tests before merging.
RunTests bool `json:"run_tests"`
// TestCommand is the command to run for testing.
TestCommand string `json:"test_command"`
// DeleteMergedBranches controls whether to delete branches after merge.
DeleteMergedBranches bool `json:"delete_merged_branches"`
// RetryFlakyTests is the number of times to retry flaky tests.
RetryFlakyTests int `json:"retry_flaky_tests"`
// PollInterval is how often to check for new MRs.
PollInterval time.Duration `json:"poll_interval"`
// MaxConcurrent is the maximum number of MRs to process concurrently.
MaxConcurrent int `json:"max_concurrent"`
}
// DefaultMergeQueueConfig returns sensible defaults for merge queue configuration.
func DefaultMergeQueueConfig() *MergeQueueConfig {
return &MergeQueueConfig{
Enabled: true,
TargetBranch: "main",
IntegrationBranches: true,
OnConflict: "assign_back",
RunTests: true,
TestCommand: "",
DeleteMergedBranches: true,
RetryFlakyTests: 1,
PollInterval: 30 * time.Second,
MaxConcurrent: 1,
}
}
// Engineer is the merge queue processor that polls for ready merge-requests
// and processes them according to the merge queue design.
type Engineer struct {
rig *rig.Rig
beads *beads.Beads
config *MergeQueueConfig
workDir string
// stopCh is used for graceful shutdown
stopCh chan struct{}
}
// NewEngineer creates a new Engineer for the given rig.
func NewEngineer(r *rig.Rig) *Engineer {
return &Engineer{
rig: r,
beads: beads.New(r.Path),
config: DefaultMergeQueueConfig(),
workDir: r.Path,
stopCh: make(chan struct{}),
}
}
// LoadConfig loads merge queue configuration from the rig's config.json.
func (e *Engineer) LoadConfig() error {
configPath := filepath.Join(e.rig.Path, "config.json")
data, err := os.ReadFile(configPath)
if err != nil {
if os.IsNotExist(err) {
// Use defaults if no config file
return nil
}
return fmt.Errorf("reading config: %w", err)
}
// Parse config file to extract merge_queue section
var rawConfig struct {
MergeQueue json.RawMessage `json:"merge_queue"`
}
if err := json.Unmarshal(data, &rawConfig); err != nil {
return fmt.Errorf("parsing config: %w", err)
}
if rawConfig.MergeQueue == nil {
// No merge_queue section, use defaults
return nil
}
// Parse merge_queue section into our config struct
// We need special handling for poll_interval (string -> Duration)
var mqRaw struct {
Enabled *bool `json:"enabled"`
TargetBranch *string `json:"target_branch"`
IntegrationBranches *bool `json:"integration_branches"`
OnConflict *string `json:"on_conflict"`
RunTests *bool `json:"run_tests"`
TestCommand *string `json:"test_command"`
DeleteMergedBranches *bool `json:"delete_merged_branches"`
RetryFlakyTests *int `json:"retry_flaky_tests"`
PollInterval *string `json:"poll_interval"`
MaxConcurrent *int `json:"max_concurrent"`
}
if err := json.Unmarshal(rawConfig.MergeQueue, &mqRaw); err != nil {
return fmt.Errorf("parsing merge_queue config: %w", err)
}
// Apply non-nil values to config (preserving defaults for missing fields)
if mqRaw.Enabled != nil {
e.config.Enabled = *mqRaw.Enabled
}
if mqRaw.TargetBranch != nil {
e.config.TargetBranch = *mqRaw.TargetBranch
}
if mqRaw.IntegrationBranches != nil {
e.config.IntegrationBranches = *mqRaw.IntegrationBranches
}
if mqRaw.OnConflict != nil {
e.config.OnConflict = *mqRaw.OnConflict
}
if mqRaw.RunTests != nil {
e.config.RunTests = *mqRaw.RunTests
}
if mqRaw.TestCommand != nil {
e.config.TestCommand = *mqRaw.TestCommand
}
if mqRaw.DeleteMergedBranches != nil {
e.config.DeleteMergedBranches = *mqRaw.DeleteMergedBranches
}
if mqRaw.RetryFlakyTests != nil {
e.config.RetryFlakyTests = *mqRaw.RetryFlakyTests
}
if mqRaw.MaxConcurrent != nil {
e.config.MaxConcurrent = *mqRaw.MaxConcurrent
}
if mqRaw.PollInterval != nil {
dur, err := time.ParseDuration(*mqRaw.PollInterval)
if err != nil {
return fmt.Errorf("invalid poll_interval %q: %w", *mqRaw.PollInterval, err)
}
e.config.PollInterval = dur
}
return nil
}
// Config returns the current merge queue configuration.
func (e *Engineer) Config() *MergeQueueConfig {
return e.config
}
// Run starts the Engineer main loop. It blocks until the context is cancelled
// or Stop() is called. Returns nil on graceful shutdown.
func (e *Engineer) Run(ctx context.Context) error {
if err := e.LoadConfig(); err != nil {
return fmt.Errorf("loading config: %w", err)
}
if !e.config.Enabled {
return fmt.Errorf("merge queue is disabled in configuration")
}
fmt.Printf("[Engineer] Starting for rig %s (poll_interval=%s)\n",
e.rig.Name, e.config.PollInterval)
ticker := time.NewTicker(e.config.PollInterval)
defer ticker.Stop()
// Run one iteration immediately, then on ticker
if err := e.processOnce(ctx); err != nil {
fmt.Printf("[Engineer] Error: %v\n", err)
}
for {
select {
case <-ctx.Done():
fmt.Println("[Engineer] Shutting down (context cancelled)")
return nil
case <-e.stopCh:
fmt.Println("[Engineer] Shutting down (stop signal)")
return nil
case <-ticker.C:
if err := e.processOnce(ctx); err != nil {
fmt.Printf("[Engineer] Error: %v\n", err)
}
}
}
}
// Stop signals the Engineer to stop processing. This is a non-blocking call.
func (e *Engineer) Stop() {
close(e.stopCh)
}
// processOnce performs one iteration of the Engineer loop:
// 1. Query for ready merge-requests
// 2. If none, return (will try again on next tick)
// 3. Process the highest priority, oldest MR
func (e *Engineer) processOnce(ctx context.Context) error {
// Check context before starting
select {
case <-ctx.Done():
return nil
default:
}
// 1. Query: bd ready --type=merge-request (filtered client-side)
readyMRs, err := e.beads.ReadyWithType("merge-request")
if err != nil {
return fmt.Errorf("querying ready merge-requests: %w", err)
}
// 2. If empty, return
if len(readyMRs) == 0 {
return nil
}
// 3. Select highest priority, oldest MR
// bd ready already returns sorted by priority then age, so first is best
mr := readyMRs[0]
fmt.Printf("[Engineer] Processing: %s (%s)\n", mr.ID, mr.Title)
// 4. Claim: bd update <id> --status=in_progress
inProgress := "in_progress"
if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &inProgress}); err != nil {
return fmt.Errorf("claiming MR %s: %w", mr.ID, err)
}
// 5. Process (delegate to ProcessMR - implementation in separate issue gt-3x1.2)
result := e.ProcessMR(ctx, mr)
// 6. Handle result
if result.Success {
// Close with merged reason
reason := fmt.Sprintf("merged: %s", result.MergeCommit)
if err := e.beads.CloseWithReason(reason, mr.ID); err != nil {
fmt.Printf("[Engineer] Warning: failed to close MR %s: %v\n", mr.ID, err)
}
fmt.Printf("[Engineer] ✓ Merged: %s\n", mr.ID)
} else {
// Failure handling (detailed implementation in gt-3x1.4)
e.handleFailure(mr, result)
}
return nil
}
// ProcessResult contains the result of processing a merge request.
type ProcessResult struct {
Success bool
MergeCommit string
Error string
Conflict bool
TestsFailed bool
}
// ProcessMR processes a single merge request.
// This is a placeholder that will be fully implemented in gt-3x1.2.
func (e *Engineer) ProcessMR(ctx context.Context, mr *beads.Issue) ProcessResult {
// Parse MR fields from description
mrFields := beads.ParseMRFields(mr)
if mrFields == nil {
return ProcessResult{
Success: false,
Error: "no MR fields found in description",
}
}
// For now, just log what we would do
// Full implementation in gt-3x1.2: Fetch and conflict check
fmt.Printf("[Engineer] Would process:\n")
fmt.Printf(" Branch: %s\n", mrFields.Branch)
fmt.Printf(" Target: %s\n", mrFields.Target)
fmt.Printf(" Worker: %s\n", mrFields.Worker)
// Return failure for now - actual implementation in gt-3x1.2
return ProcessResult{
Success: false,
Error: "ProcessMR not fully implemented (see gt-3x1.2)",
}
}
// handleFailure handles a failed merge request.
// This is a placeholder that will be fully implemented in gt-3x1.4.
func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) {
// Reopen the MR (back to open status for rework)
open := "open"
if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &open}); err != nil {
fmt.Printf("[Engineer] Warning: failed to reopen MR %s: %v\n", mr.ID, err)
}
// Log the failure
fmt.Printf("[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error)
// Full failure handling (assign back to worker, labels) in gt-3x1.4
}
+209
View File
@@ -0,0 +1,209 @@
package refinery
import (
"encoding/json"
"os"
"path/filepath"
"testing"
"time"
"github.com/steveyegge/gastown/internal/rig"
)
func TestDefaultMergeQueueConfig(t *testing.T) {
cfg := DefaultMergeQueueConfig()
if !cfg.Enabled {
t.Error("expected Enabled to be true by default")
}
if cfg.TargetBranch != "main" {
t.Errorf("expected TargetBranch to be 'main', got %q", cfg.TargetBranch)
}
if cfg.PollInterval != 30*time.Second {
t.Errorf("expected PollInterval to be 30s, got %v", cfg.PollInterval)
}
if cfg.MaxConcurrent != 1 {
t.Errorf("expected MaxConcurrent to be 1, got %d", cfg.MaxConcurrent)
}
if cfg.OnConflict != "assign_back" {
t.Errorf("expected OnConflict to be 'assign_back', got %q", cfg.OnConflict)
}
}
func TestEngineer_LoadConfig_NoFile(t *testing.T) {
// Create a temp directory without config.json
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(tmpDir) }()
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
// Should not error with missing config file
if err := e.LoadConfig(); err != nil {
t.Errorf("unexpected error with missing config: %v", err)
}
// Should use defaults
if e.config.PollInterval != 30*time.Second {
t.Errorf("expected default PollInterval, got %v", e.config.PollInterval)
}
}
func TestEngineer_LoadConfig_WithMergeQueue(t *testing.T) {
// Create a temp directory with config.json
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(tmpDir) }()
// Write config file
config := map[string]interface{}{
"type": "rig",
"version": 1,
"name": "test-rig",
"merge_queue": map[string]interface{}{
"enabled": true,
"target_branch": "develop",
"poll_interval": "10s",
"max_concurrent": 2,
"run_tests": false,
"test_command": "make test",
},
}
data, _ := json.MarshalIndent(config, "", " ")
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
t.Fatal(err)
}
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
if err := e.LoadConfig(); err != nil {
t.Errorf("unexpected error loading config: %v", err)
}
// Check that config values were loaded
if e.config.TargetBranch != "develop" {
t.Errorf("expected TargetBranch 'develop', got %q", e.config.TargetBranch)
}
if e.config.PollInterval != 10*time.Second {
t.Errorf("expected PollInterval 10s, got %v", e.config.PollInterval)
}
if e.config.MaxConcurrent != 2 {
t.Errorf("expected MaxConcurrent 2, got %d", e.config.MaxConcurrent)
}
if e.config.RunTests != false {
t.Errorf("expected RunTests false, got %v", e.config.RunTests)
}
if e.config.TestCommand != "make test" {
t.Errorf("expected TestCommand 'make test', got %q", e.config.TestCommand)
}
// Check that defaults are preserved for unspecified fields
if e.config.OnConflict != "assign_back" {
t.Errorf("expected OnConflict default 'assign_back', got %q", e.config.OnConflict)
}
}
func TestEngineer_LoadConfig_NoMergeQueueSection(t *testing.T) {
// Create a temp directory with config.json without merge_queue
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(tmpDir) }()
// Write config file without merge_queue
config := map[string]interface{}{
"type": "rig",
"version": 1,
"name": "test-rig",
}
data, _ := json.MarshalIndent(config, "", " ")
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
t.Fatal(err)
}
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
if err := e.LoadConfig(); err != nil {
t.Errorf("unexpected error loading config: %v", err)
}
// Should use all defaults
if e.config.PollInterval != 30*time.Second {
t.Errorf("expected default PollInterval, got %v", e.config.PollInterval)
}
}
func TestEngineer_LoadConfig_InvalidPollInterval(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(tmpDir) }()
config := map[string]interface{}{
"merge_queue": map[string]interface{}{
"poll_interval": "not-a-duration",
},
}
data, _ := json.MarshalIndent(config, "", " ")
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
t.Fatal(err)
}
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
err = e.LoadConfig()
if err == nil {
t.Error("expected error for invalid poll_interval")
}
}
func TestNewEngineer(t *testing.T) {
r := &rig.Rig{
Name: "test-rig",
Path: "/tmp/test-rig",
}
e := NewEngineer(r)
if e.rig != r {
t.Error("expected rig to be set")
}
if e.beads == nil {
t.Error("expected beads client to be initialized")
}
if e.config == nil {
t.Error("expected config to be initialized with defaults")
}
if e.stopCh == nil {
t.Error("expected stopCh to be initialized")
}
}
+401 -273
View File
@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
@@ -13,9 +12,9 @@ import (
"strings"
"time"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/mail"
"github.com/steveyegge/gastown/internal/rig"
"github.com/steveyegge/gastown/internal/tmux"
)
// Common errors
@@ -44,6 +43,11 @@ func (m *Manager) stateFile() string {
return filepath.Join(m.rig.Path, ".gastown", "refinery.json")
}
// sessionName returns the tmux session name for this refinery.
func (m *Manager) sessionName() string {
return fmt.Sprintf("gt-%s-refinery", m.rig.Name)
}
// loadState loads refinery state from disk.
func (m *Manager) loadState() (*Refinery, error) {
data, err := os.ReadFile(m.stateFile())
@@ -87,13 +91,35 @@ func (m *Manager) Status() (*Refinery, error) {
return nil, err
}
// If running, verify process is still alive
if ref.State == StateRunning && ref.PID > 0 {
if !processExists(ref.PID) {
ref.State = StateStopped
ref.PID = 0
m.saveState(ref)
// Check if tmux session exists
t := tmux.NewTmux()
sessionID := m.sessionName()
sessionRunning, _ := t.HasSession(sessionID)
// If tmux session is running, refinery is running
if sessionRunning {
if ref.State != StateRunning {
// Update state to match reality
now := time.Now()
ref.State = StateRunning
if ref.StartedAt == nil {
ref.StartedAt = &now
}
_ = m.saveState(ref)
}
return ref, nil
}
// If state says running but tmux session doesn't exist, check PID
if ref.State == StateRunning {
if ref.PID > 0 && processExists(ref.PID) {
// Process is still running (foreground mode without tmux)
return ref, nil
}
// Neither session nor process exists - mark as stopped
ref.State = StateStopped
ref.PID = 0
_ = m.saveState(ref)
}
return ref, nil
@@ -101,33 +127,63 @@ func (m *Manager) Status() (*Refinery, error) {
// Start starts the refinery.
// If foreground is true, runs in the current process (blocking).
// Otherwise, spawns a background process.
// Otherwise, spawns a tmux session running the refinery in foreground mode.
func (m *Manager) Start(foreground bool) error {
ref, err := m.loadState()
if err != nil {
return err
}
// Check if already running via tmux session
t := tmux.NewTmux()
sessionID := m.sessionName()
running, _ := t.HasSession(sessionID)
if running {
return ErrAlreadyRunning
}
// Also check via PID for backwards compatibility
if ref.State == StateRunning && ref.PID > 0 && processExists(ref.PID) {
return ErrAlreadyRunning
}
now := time.Now()
ref.State = StateRunning
ref.StartedAt = &now
ref.PID = os.Getpid() // For foreground mode; background would set actual PID
if err := m.saveState(ref); err != nil {
return err
}
if foreground {
// Running in foreground - update state and run
now := time.Now()
ref.State = StateRunning
ref.StartedAt = &now
ref.PID = os.Getpid()
if err := m.saveState(ref); err != nil {
return err
}
// Run the processing loop (blocking)
return m.run(ref)
}
// Background mode: spawn a new process
// For MVP, we just mark as running - actual daemon implementation in gt-ov2
// Background mode: spawn a tmux session running the refinery
if err := t.NewSession(sessionID, m.workDir); err != nil {
return fmt.Errorf("creating tmux session: %w", err)
}
// Set environment variables
_ = t.SetEnvironment(sessionID, "GT_RIG", m.rig.Name)
_ = t.SetEnvironment(sessionID, "GT_REFINERY", "1")
// Apply theme (same as rig polecats)
theme := tmux.AssignTheme(m.rig.Name)
_ = t.ConfigureGasTownSession(sessionID, theme, m.rig.Name, "refinery", "refinery")
// Send the command to start refinery in foreground mode
// The foreground mode handles state updates and the processing loop
command := fmt.Sprintf("gt refinery start %s --foreground", m.rig.Name)
if err := t.SendKeys(sessionID, command); err != nil {
// Clean up the session on failure
_ = t.KillSession(sessionID)
return fmt.Errorf("starting refinery: %w", err)
}
return nil
}
@@ -138,15 +194,26 @@ func (m *Manager) Stop() error {
return err
}
if ref.State != StateRunning {
// Check if tmux session exists
t := tmux.NewTmux()
sessionID := m.sessionName()
sessionRunning, _ := t.HasSession(sessionID)
// If neither state nor session indicates running, it's not running
if ref.State != StateRunning && !sessionRunning {
return ErrNotRunning
}
// If we have a PID, try to stop it gracefully
if ref.PID > 0 && ref.PID != os.Getpid() {
// Kill tmux session if it exists
if sessionRunning {
_ = t.KillSession(sessionID)
}
// If we have a PID and it's a different process, try to stop it gracefully
if ref.PID > 0 && ref.PID != os.Getpid() && processExists(ref.PID) {
// Send SIGTERM
if proc, err := os.FindProcess(ref.PID); err == nil {
proc.Signal(os.Interrupt)
_ = proc.Signal(os.Interrupt)
}
}
@@ -258,15 +325,13 @@ func (m *Manager) run(ref *Refinery) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Process queue
if err := m.ProcessQueue(); err != nil {
fmt.Printf("Queue processing error: %v\n", err)
}
for range ticker.C {
// Process queue
if err := m.ProcessQueue(); err != nil {
fmt.Printf("Queue processing error: %v\n", err)
}
}
return nil
}
// ProcessQueue processes all pending merge requests.
@@ -297,50 +362,44 @@ func (m *Manager) ProcessQueue() error {
// MergeResult contains the result of a merge attempt.
type MergeResult struct {
Success bool
MergeCommit string // SHA of merge commit on success
Error string
FailureType FailureType
RetryCount int // Number of retries attempted
Conflict bool
TestsFailed bool
}
// ProcessMR processes a single merge request.
func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
return m.processMRWithRetry(mr, 0)
}
// processMRWithRetry processes a merge request with retry support.
func (m *Manager) processMRWithRetry(mr *MergeRequest, retryCount int) MergeResult {
ref, _ := m.loadState()
config := m.getMergeConfig()
// Claim the MR (open → in_progress)
if err := mr.Claim(); err != nil {
return MergeResult{Error: fmt.Sprintf("cannot claim MR: %v", err)}
}
ref.CurrentMR = mr
m.saveState(ref)
_ = m.saveState(ref)
result := MergeResult{RetryCount: retryCount}
result := MergeResult{}
// 1. Fetch the branch
if err := m.gitRun("fetch", "origin", mr.Branch); err != nil {
result.Error = fmt.Sprintf("fetch failed: %v", err)
result.FailureType = FailureFetch
m.handleFailure(mr, result)
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
// 2. Attempt merge to target branch
// First, checkout target
// 2. Checkout target branch
if err := m.gitRun("checkout", mr.TargetBranch); err != nil {
result.Error = fmt.Sprintf("checkout target failed: %v", err)
result.FailureType = FailureCheckout
m.handleFailure(mr, result)
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
// Pull latest
m.gitRun("pull", "origin", mr.TargetBranch) // Ignore errors
_ = m.gitRun("pull", "origin", mr.TargetBranch) // Ignore errors
// Merge
// 3. Merge
err := m.gitRun("merge", "--no-ff", "-m",
fmt.Sprintf("Merge %s from %s", mr.Branch, mr.Worker),
"origin/"+mr.Branch)
@@ -348,258 +407,63 @@ func (m *Manager) processMRWithRetry(mr *MergeRequest, retryCount int) MergeResu
if err != nil {
errStr := err.Error()
if strings.Contains(errStr, "CONFLICT") || strings.Contains(errStr, "conflict") {
result.FailureType = FailureConflict
result.Conflict = true
result.Error = "merge conflict"
// Abort the merge
m.gitRun("merge", "--abort")
m.handleFailure(mr, result)
_ = m.gitRun("merge", "--abort")
m.completeMR(mr, "", "merge conflict - polecat must rebase") // Reopen for rebase
// Notify worker about conflict
m.notifyWorkerConflict(mr)
return result
}
result.Error = fmt.Sprintf("merge failed: %v", err)
m.handleFailure(mr, result)
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
// 3. Run tests if configured
testCmd := m.getTestCommand()
if testCmd != "" {
if err := m.runTests(testCmd); err != nil {
// Reset to before merge
m.gitRun("reset", "--hard", "HEAD~1")
// Check if this might be a flaky test (retry once)
retryFlakyTests := m.getRetryFlakyTests()
if retryCount < retryFlakyTests {
log.Printf("[MQ] Test failure on attempt %d, retrying (may be flaky)...", retryCount+1)
// Reopen the MR for retry
mr.Reopen()
return m.processMRWithRetry(mr, retryCount+1)
}
result.FailureType = FailureTestsFail
// 4. Run tests if configured
if config.RunTests && config.TestCommand != "" {
if err := m.runTests(config.TestCommand); err != nil {
result.TestsFailed = true
result.Error = fmt.Sprintf("tests failed: %v", err)
m.handleFailure(mr, result)
// Reset to before merge
_ = m.gitRun("reset", "--hard", "HEAD~1")
m.completeMR(mr, "", result.Error) // Reopen for fixes
return result
}
}
// 4. Push with retry for transient failures
pushErr := m.pushWithRetry(mr.TargetBranch, 3)
if pushErr != nil {
result.Error = fmt.Sprintf("push failed: %v", pushErr)
result.FailureType = FailurePushFail
// 5. Push with retry logic
if err := m.pushWithRetry(mr.TargetBranch, config); err != nil {
result.Error = fmt.Sprintf("push failed: %v", err)
// Reset to before merge
m.gitRun("reset", "--hard", "HEAD~1")
m.handleFailure(mr, result)
_ = m.gitRun("reset", "--hard", "HEAD~1")
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
// 6. Get merge commit SHA
mergeCommit, err := m.gitOutput("rev-parse", "HEAD")
if err != nil {
mergeCommit = "" // Non-fatal, continue
}
// Success!
result.Success = true
result.MergeCommit = mergeCommit
m.completeMR(mr, CloseReasonMerged, "")
// Notify worker of success
m.notifyWorkerMerged(mr)
// Optionally delete the merged branch
m.gitRun("push", "origin", "--delete", mr.Branch)
if config.DeleteMergedBranches {
_ = m.gitRun("push", "origin", "--delete", mr.Branch)
}
return result
}
// pushWithRetry attempts to push with exponential backoff.
func (m *Manager) pushWithRetry(branch string, maxRetries int) error {
var lastErr error
for i := 0; i < maxRetries; i++ {
if i > 0 {
// Exponential backoff: 1s, 2s, 4s...
backoff := time.Duration(1<<uint(i-1)) * time.Second
log.Printf("[MQ] Push attempt %d failed, retrying in %v...", i, backoff)
time.Sleep(backoff)
}
lastErr = m.gitRun("push", "origin", branch)
if lastErr == nil {
return nil
}
}
return lastErr
}
// handleFailure handles merge failures by updating the source issue and notifying the worker.
func (m *Manager) handleFailure(mr *MergeRequest, result MergeResult) {
// Log the failure details
log.Printf("[MQ] Merge failed for %s: type=%s, error=%s, retries=%d",
mr.Branch, result.FailureType, result.Error, result.RetryCount)
// Reopen the MR (in_progress → open)
m.completeMR(mr, "", result.Error)
// Update the source issue in beads if we have an issue ID
if mr.IssueID != "" && result.FailureType.ShouldAssignToWorker() {
m.updateSourceIssue(mr, result)
}
// Send notification to worker
m.notifyWorkerFailure(mr, result)
}
// updateSourceIssue updates the beads issue with failure info.
func (m *Manager) updateSourceIssue(mr *MergeRequest, result MergeResult) {
// Find the beads directory - use the rig's canonical beads
beadsDir := m.findBeadsDir()
if beadsDir == "" {
log.Printf("[MQ] Warning: could not find beads directory to update issue %s", mr.IssueID)
return
}
bd := beads.New(beadsDir)
// Build update options
opts := beads.UpdateOptions{}
// Set status back to open
status := "open"
opts.Status = &status
// Assign back to worker
workerAddr := fmt.Sprintf("%s/%s", m.rig.Name, mr.Worker)
opts.Assignee = &workerAddr
// Add the failure label
label := result.FailureType.FailureLabel()
if label != "" {
opts.AddLabels = []string{label}
}
// Perform the update
if err := bd.Update(mr.IssueID, opts); err != nil {
log.Printf("[MQ] Warning: failed to update issue %s: %v", mr.IssueID, err)
} else {
log.Printf("[MQ] Updated issue %s: status=open, assignee=%s, label=%s",
mr.IssueID, workerAddr, label)
}
}
// findBeadsDir locates the beads directory for this rig.
func (m *Manager) findBeadsDir() string {
// First try the rig's own .beads directory
beadsPath := filepath.Join(m.rig.Path, ".beads")
if _, err := os.Stat(beadsPath); err == nil {
return m.rig.Path
}
// Fall back to town root if we can find it
townRoot := findTownRoot(m.rig.Path)
if townRoot != "" {
// Check if rig has a separate beads in polecats dir
rigBeadsPath := filepath.Join(townRoot, m.rig.Name, ".beads")
if _, err := os.Stat(rigBeadsPath); err == nil {
return filepath.Join(townRoot, m.rig.Name)
}
}
return ""
}
// notifyWorkerFailure sends a failure notification to the worker.
func (m *Manager) notifyWorkerFailure(mr *MergeRequest, result MergeResult) {
router := mail.NewRouter(m.workDir)
var subject, body string
switch result.FailureType {
case FailureConflict:
subject = "Merge conflict - rebase required"
body = fmt.Sprintf(`Your branch %s has conflicts with %s.
Please rebase your changes:
git fetch origin
git rebase origin/%s
git push -f
Then the Refinery will retry the merge.
Issue: %s
Error: %s`,
mr.Branch, mr.TargetBranch, mr.TargetBranch, mr.IssueID, result.Error)
case FailureTestsFail:
subject = "Tests failed - fix required"
body = fmt.Sprintf(`Your branch %s failed tests after merging with %s.
Please fix the failing tests and push your changes.
Issue: %s
Error: %s
Retries attempted: %d`,
mr.Branch, mr.TargetBranch, mr.IssueID, result.Error, result.RetryCount)
case FailureBuildFail:
subject = "Build failed - fix required"
body = fmt.Sprintf(`Your branch %s failed to build after merging with %s.
Please fix the build errors and push your changes.
Issue: %s
Error: %s`,
mr.Branch, mr.TargetBranch, mr.IssueID, result.Error)
case FailurePushFail:
subject = "Push failed - retrying"
body = fmt.Sprintf(`Push to %s failed after merging your branch %s.
This may be a transient issue. The Refinery will retry.
If this persists, please contact the team.
Issue: %s
Error: %s`,
mr.TargetBranch, mr.Branch, mr.IssueID, result.Error)
default:
subject = "Merge failed"
body = fmt.Sprintf(`Merge of branch %s to %s failed.
Issue: %s
Error: %s
Please check the error and try again.`,
mr.Branch, mr.TargetBranch, mr.IssueID, result.Error)
}
msg := &mail.Message{
From: fmt.Sprintf("%s/refinery", m.rig.Name),
To: fmt.Sprintf("%s/%s", m.rig.Name, mr.Worker),
Subject: subject,
Body: body,
Priority: mail.PriorityHigh,
}
router.Send(msg)
}
// getRetryFlakyTests returns the number of retries for flaky tests from config.
func (m *Manager) getRetryFlakyTests() int {
configPath := filepath.Join(m.rig.Path, ".gastown", "config.json")
data, err := os.ReadFile(configPath)
if err != nil {
return 1 // Default: retry once
}
var config struct {
MergeQueue struct {
RetryFlakyTests int `json:"retry_flaky_tests"`
} `json:"merge_queue"`
}
if err := json.Unmarshal(data, &config); err != nil {
return 1
}
if config.MergeQueue.RetryFlakyTests > 0 {
return config.MergeQueue.RetryFlakyTests
}
return 1
}
// completeMR marks an MR as complete and updates stats.
// For success, pass closeReason (e.g., CloseReasonMerged).
// For failures that should return to open, pass empty closeReason.
@@ -638,7 +502,7 @@ func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg s
ref.Stats.TodayFailed++
}
m.saveState(ref)
_ = m.saveState(ref)
}
// getTestCommand returns the test command if configured.
@@ -699,6 +563,89 @@ func (m *Manager) gitRun(args ...string) error {
return nil
}
// gitOutput executes a git command and returns stdout.
func (m *Manager) gitOutput(args ...string) (string, error) {
cmd := exec.Command("git", args...)
cmd.Dir = m.workDir
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
errMsg := strings.TrimSpace(stderr.String())
if errMsg != "" {
return "", fmt.Errorf("%s", errMsg)
}
return "", err
}
return strings.TrimSpace(stdout.String()), nil
}
// getMergeConfig loads the merge configuration from disk.
// Returns default config if not configured.
func (m *Manager) getMergeConfig() MergeConfig {
config := DefaultMergeConfig()
// Check for .gastown/config.json with merge_queue settings
configPath := filepath.Join(m.rig.Path, ".gastown", "config.json")
data, err := os.ReadFile(configPath)
if err != nil {
return config
}
var rawConfig struct {
MergeQueue *MergeConfig `json:"merge_queue"`
// Legacy field for backwards compatibility
TestCommand string `json:"test_command"`
}
if err := json.Unmarshal(data, &rawConfig); err != nil {
return config
}
// Apply merge_queue config if present
if rawConfig.MergeQueue != nil {
config = *rawConfig.MergeQueue
// Ensure defaults for zero values
if config.PushRetryCount == 0 {
config.PushRetryCount = 3
}
if config.PushRetryDelayMs == 0 {
config.PushRetryDelayMs = 1000
}
}
// Legacy: use test_command if merge_queue not set
if rawConfig.TestCommand != "" && config.TestCommand == "" {
config.TestCommand = rawConfig.TestCommand
}
return config
}
// pushWithRetry pushes to the target branch with exponential backoff retry.
func (m *Manager) pushWithRetry(targetBranch string, config MergeConfig) error {
var lastErr error
delay := time.Duration(config.PushRetryDelayMs) * time.Millisecond
for attempt := 0; attempt <= config.PushRetryCount; attempt++ {
if attempt > 0 {
fmt.Printf("Push retry %d/%d after %v\n", attempt, config.PushRetryCount, delay)
time.Sleep(delay)
delay *= 2 // Exponential backoff
}
err := m.gitRun("push", "origin", targetBranch)
if err == nil {
return nil // Success
}
lastErr = err
}
return fmt.Errorf("push failed after %d retries: %v", config.PushRetryCount, lastErr)
}
// processExists checks if a process with the given PID exists.
func processExists(pid int) bool {
proc, err := os.FindProcess(pid)
@@ -726,6 +673,27 @@ func formatAge(t time.Time) string {
return fmt.Sprintf("%dd ago", int(d.Hours()/24))
}
// notifyWorkerConflict sends a conflict notification to a polecat.
func (m *Manager) notifyWorkerConflict(mr *MergeRequest) {
router := mail.NewRouter(m.workDir)
msg := &mail.Message{
From: fmt.Sprintf("%s/refinery", m.rig.Name),
To: fmt.Sprintf("%s/%s", m.rig.Name, mr.Worker),
Subject: "Merge conflict - rebase required",
Body: fmt.Sprintf(`Your branch %s has conflicts with %s.
Please rebase your changes:
git fetch origin
git rebase origin/%s
git push -f
Then the Refinery will retry the merge.`,
mr.Branch, mr.TargetBranch, mr.TargetBranch),
Priority: mail.PriorityHigh,
}
_ = router.Send(msg)
}
// notifyWorkerMerged sends a success notification to a polecat.
func (m *Manager) notifyWorkerMerged(mr *MergeRequest) {
router := mail.NewRouter(m.workDir)
@@ -739,7 +707,167 @@ Issue: %s
Thank you for your contribution!`,
mr.Branch, mr.TargetBranch, mr.IssueID),
}
router.Send(msg)
_ = router.Send(msg)
}
// Common errors for MR operations
var (
ErrMRNotFound = errors.New("merge request not found")
ErrMRNotFailed = errors.New("merge request has not failed")
)
// GetMR returns a merge request by ID from the state.
func (m *Manager) GetMR(id string) (*MergeRequest, error) {
ref, err := m.loadState()
if err != nil {
return nil, err
}
// Check if it's the current MR
if ref.CurrentMR != nil && ref.CurrentMR.ID == id {
return ref.CurrentMR, nil
}
// Check pending MRs
if ref.PendingMRs != nil {
if mr, ok := ref.PendingMRs[id]; ok {
return mr, nil
}
}
return nil, ErrMRNotFound
}
// FindMR finds a merge request by ID or branch name in the queue.
func (m *Manager) FindMR(idOrBranch string) (*MergeRequest, error) {
queue, err := m.Queue()
if err != nil {
return nil, err
}
for _, item := range queue {
// Match by ID
if item.MR.ID == idOrBranch {
return item.MR, nil
}
// Match by branch name (with or without polecat/ prefix)
if item.MR.Branch == idOrBranch {
return item.MR, nil
}
if "polecat/"+idOrBranch == item.MR.Branch {
return item.MR, nil
}
// Match by worker name (partial match for convenience)
if strings.Contains(item.MR.ID, idOrBranch) {
return item.MR, nil
}
}
return nil, ErrMRNotFound
}
// Retry resets a failed merge request so it can be processed again.
// If processNow is true, immediately processes the MR instead of waiting for the loop.
func (m *Manager) Retry(id string, processNow bool) error {
ref, err := m.loadState()
if err != nil {
return err
}
// Find the MR
var mr *MergeRequest
if ref.PendingMRs != nil {
mr = ref.PendingMRs[id]
}
if mr == nil {
return ErrMRNotFound
}
// Verify it's in a failed state (open with an error)
if mr.Status != MROpen || mr.Error == "" {
return ErrMRNotFailed
}
// Clear the error to mark as ready for retry
mr.Error = ""
// Save the state
if err := m.saveState(ref); err != nil {
return err
}
// If --now flag, process immediately
if processNow {
result := m.ProcessMR(mr)
if !result.Success {
return fmt.Errorf("retry failed: %s", result.Error)
}
}
return nil
}
// RegisterMR adds a merge request to the pending queue.
func (m *Manager) RegisterMR(mr *MergeRequest) error {
ref, err := m.loadState()
if err != nil {
return err
}
if ref.PendingMRs == nil {
ref.PendingMRs = make(map[string]*MergeRequest)
}
ref.PendingMRs[mr.ID] = mr
return m.saveState(ref)
}
// RejectMR manually rejects a merge request.
// It closes the MR with rejected status and optionally notifies the worker.
// Returns the rejected MR for display purposes.
func (m *Manager) RejectMR(idOrBranch string, reason string, notify bool) (*MergeRequest, error) {
mr, err := m.FindMR(idOrBranch)
if err != nil {
return nil, err
}
// Verify MR is open or in_progress (can't reject already closed)
if mr.IsClosed() {
return nil, fmt.Errorf("%w: MR is already closed with reason: %s", ErrClosedImmutable, mr.CloseReason)
}
// Close with rejected reason
if err := mr.Close(CloseReasonRejected); err != nil {
return nil, fmt.Errorf("failed to close MR: %w", err)
}
mr.Error = reason
// Optionally notify worker
if notify {
m.notifyWorkerRejected(mr, reason)
}
return mr, nil
}
// notifyWorkerRejected sends a rejection notification to a polecat.
func (m *Manager) notifyWorkerRejected(mr *MergeRequest, reason string) {
router := mail.NewRouter(m.workDir)
msg := &mail.Message{
From: fmt.Sprintf("%s/refinery", m.rig.Name),
To: fmt.Sprintf("%s/%s", m.rig.Name, mr.Worker),
Subject: "Merge request rejected",
Body: fmt.Sprintf(`Your merge request has been rejected.
Branch: %s
Issue: %s
Reason: %s
Please review the feedback and address the issues before resubmitting.`,
mr.Branch, mr.IssueID, reason),
Priority: mail.PriorityNormal,
}
_ = router.Send(msg)
}
// findTownRoot walks up directories to find the town root.
+172
View File
@@ -0,0 +1,172 @@
package refinery
import (
"encoding/json"
"os"
"path/filepath"
"testing"
"time"
"github.com/steveyegge/gastown/internal/rig"
)
func setupTestManager(t *testing.T) (*Manager, string) {
t.Helper()
// Create temp directory structure
tmpDir := t.TempDir()
rigPath := filepath.Join(tmpDir, "testrig")
if err := os.MkdirAll(filepath.Join(rigPath, ".gastown"), 0755); err != nil {
t.Fatalf("mkdir .gastown: %v", err)
}
r := &rig.Rig{
Name: "testrig",
Path: rigPath,
}
return NewManager(r), rigPath
}
func TestManager_GetMR(t *testing.T) {
mgr, _ := setupTestManager(t)
// Create a test MR in the pending queue
mr := &MergeRequest{
ID: "gt-mr-abc123",
Branch: "polecat/Toast/gt-xyz",
Worker: "Toast",
IssueID: "gt-xyz",
Status: MROpen,
Error: "test failure",
}
if err := mgr.RegisterMR(mr); err != nil {
t.Fatalf("RegisterMR: %v", err)
}
t.Run("find existing MR", func(t *testing.T) {
found, err := mgr.GetMR("gt-mr-abc123")
if err != nil {
t.Errorf("GetMR() unexpected error: %v", err)
}
if found == nil {
t.Fatal("GetMR() returned nil")
}
if found.ID != mr.ID {
t.Errorf("GetMR() ID = %s, want %s", found.ID, mr.ID)
}
})
t.Run("MR not found", func(t *testing.T) {
_, err := mgr.GetMR("nonexistent-mr")
if err != ErrMRNotFound {
t.Errorf("GetMR() error = %v, want %v", err, ErrMRNotFound)
}
})
}
func TestManager_Retry(t *testing.T) {
t.Run("retry failed MR clears error", func(t *testing.T) {
mgr, _ := setupTestManager(t)
// Create a failed MR
mr := &MergeRequest{
ID: "gt-mr-failed",
Branch: "polecat/Toast/gt-xyz",
Worker: "Toast",
Status: MROpen,
Error: "merge conflict",
}
if err := mgr.RegisterMR(mr); err != nil {
t.Fatalf("RegisterMR: %v", err)
}
// Retry without processing
err := mgr.Retry("gt-mr-failed", false)
if err != nil {
t.Errorf("Retry() unexpected error: %v", err)
}
// Verify error was cleared
found, _ := mgr.GetMR("gt-mr-failed")
if found.Error != "" {
t.Errorf("Retry() error not cleared, got %s", found.Error)
}
})
t.Run("retry non-failed MR fails", func(t *testing.T) {
mgr, _ := setupTestManager(t)
// Create a successful MR (no error)
mr := &MergeRequest{
ID: "gt-mr-success",
Branch: "polecat/Toast/gt-abc",
Worker: "Toast",
Status: MROpen,
Error: "", // No error
}
if err := mgr.RegisterMR(mr); err != nil {
t.Fatalf("RegisterMR: %v", err)
}
err := mgr.Retry("gt-mr-success", false)
if err != ErrMRNotFailed {
t.Errorf("Retry() error = %v, want %v", err, ErrMRNotFailed)
}
})
t.Run("retry nonexistent MR fails", func(t *testing.T) {
mgr, _ := setupTestManager(t)
err := mgr.Retry("nonexistent", false)
if err != ErrMRNotFound {
t.Errorf("Retry() error = %v, want %v", err, ErrMRNotFound)
}
})
}
func TestManager_RegisterMR(t *testing.T) {
mgr, rigPath := setupTestManager(t)
mr := &MergeRequest{
ID: "gt-mr-new",
Branch: "polecat/Cheedo/gt-123",
Worker: "Cheedo",
IssueID: "gt-123",
TargetBranch: "main",
CreatedAt: time.Now(),
Status: MROpen,
}
if err := mgr.RegisterMR(mr); err != nil {
t.Fatalf("RegisterMR: %v", err)
}
// Verify it was saved to disk
stateFile := filepath.Join(rigPath, ".gastown", "refinery.json")
data, err := os.ReadFile(stateFile)
if err != nil {
t.Fatalf("reading state file: %v", err)
}
var ref Refinery
if err := json.Unmarshal(data, &ref); err != nil {
t.Fatalf("unmarshal state: %v", err)
}
if ref.PendingMRs == nil {
t.Fatal("PendingMRs is nil")
}
saved, ok := ref.PendingMRs["gt-mr-new"]
if !ok {
t.Fatal("MR not found in PendingMRs")
}
if saved.Worker != "Cheedo" {
t.Errorf("saved MR worker = %s, want Cheedo", saved.Worker)
}
}
+39
View File
@@ -38,6 +38,10 @@ type Refinery struct {
// CurrentMR is the merge request currently being processed.
CurrentMR *MergeRequest `json:"current_mr,omitempty"`
// PendingMRs tracks merge requests that have been submitted.
// Key is the MR ID.
PendingMRs map[string]*MergeRequest `json:"pending_mrs,omitempty"`
// LastMergeAt is when the last successful merge happened.
LastMergeAt *time.Time `json:"last_merge_at,omitempty"`
@@ -111,6 +115,41 @@ const (
)
// MergeConfig contains configuration for the merge process.
type MergeConfig struct {
// RunTests controls whether tests are run after merge.
// Default: true
RunTests bool `json:"run_tests"`
// TestCommand is the command to run for testing.
// Default: "go test ./..."
TestCommand string `json:"test_command"`
// DeleteMergedBranches controls whether merged branches are deleted.
// Default: true
DeleteMergedBranches bool `json:"delete_merged_branches"`
// PushRetryCount is the number of times to retry a failed push.
// Default: 3
PushRetryCount int `json:"push_retry_count"`
// PushRetryDelayMs is the base delay between push retries in milliseconds.
// Each retry doubles the delay (exponential backoff).
// Default: 1000
PushRetryDelayMs int `json:"push_retry_delay_ms"`
}
// DefaultMergeConfig returns the default merge configuration.
func DefaultMergeConfig() MergeConfig {
return MergeConfig{
RunTests: true,
TestCommand: "go test ./...",
DeleteMergedBranches: true,
PushRetryCount: 3,
PushRetryDelayMs: 1000,
}
}
// RefineryStats contains cumulative refinery statistics.
type RefineryStats struct {
// TotalMerged is the total number of successful merges.