feat: implement Engineer main loop for merge queue processing

Adds the Engineer component that polls for ready merge-requests and
processes them according to the merge queue design.

Features:
- Main loop that queries `bd ready` for merge-request type issues
- Configurable poll_interval and max_concurrent from rig config.json
- Graceful shutdown via context cancellation or Stop() method
- Claims MRs via `bd update --status=in_progress` before processing
- Handles success/failure with appropriate status updates

Configuration (in rig config.json merge_queue section):
- poll_interval: duration string (default "30s")
- max_concurrent: number (default 1)
- enabled, target_branch, run_tests, test_command, etc.

Also adds ReadyWithType() to beads package for type-filtered queries.

Note: ProcessMR() and handleFailure() are placeholders - full
implementation in gt-3x1.2 and gt-3x1.4.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-18 20:14:10 -08:00
parent ae0c532723
commit 7c2780fcd1
3 changed files with 550 additions and 0 deletions

View File

@@ -190,6 +190,26 @@ func (b *Beads) Ready() ([]*Issue, error) {
return issues, nil
}
// ReadyWithType returns ready issues filtered by type.
// This fetches all ready issues and filters client-side by type.
// Issues are returned sorted by priority (lowest first) then by creation time (oldest first).
func (b *Beads) ReadyWithType(issueType string) ([]*Issue, error) {
issues, err := b.Ready()
if err != nil {
return nil, err
}
// Filter by type
var filtered []*Issue
for _, issue := range issues {
if issue.Type == issueType {
filtered = append(filtered, issue)
}
}
return filtered, nil
}
// Show returns detailed information about an issue.
func (b *Beads) Show(id string) (*Issue, error) {
out, err := b.run("show", id, "--json")

View File

@@ -0,0 +1,321 @@
// Package refinery provides the merge queue processing agent.
package refinery
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/rig"
)
// MergeQueueConfig holds configuration for the merge queue processor.
type MergeQueueConfig struct {
// Enabled controls whether the merge queue is active.
Enabled bool `json:"enabled"`
// TargetBranch is the default branch to merge to (e.g., "main").
TargetBranch string `json:"target_branch"`
// IntegrationBranches enables per-epic integration branches.
IntegrationBranches bool `json:"integration_branches"`
// OnConflict is the strategy for handling conflicts: "assign_back" or "auto_rebase".
OnConflict string `json:"on_conflict"`
// RunTests controls whether to run tests before merging.
RunTests bool `json:"run_tests"`
// TestCommand is the command to run for testing.
TestCommand string `json:"test_command"`
// DeleteMergedBranches controls whether to delete branches after merge.
DeleteMergedBranches bool `json:"delete_merged_branches"`
// RetryFlakyTests is the number of times to retry flaky tests.
RetryFlakyTests int `json:"retry_flaky_tests"`
// PollInterval is how often to check for new MRs.
PollInterval time.Duration `json:"poll_interval"`
// MaxConcurrent is the maximum number of MRs to process concurrently.
MaxConcurrent int `json:"max_concurrent"`
}
// DefaultMergeQueueConfig returns sensible defaults for merge queue configuration.
func DefaultMergeQueueConfig() *MergeQueueConfig {
return &MergeQueueConfig{
Enabled: true,
TargetBranch: "main",
IntegrationBranches: true,
OnConflict: "assign_back",
RunTests: true,
TestCommand: "",
DeleteMergedBranches: true,
RetryFlakyTests: 1,
PollInterval: 30 * time.Second,
MaxConcurrent: 1,
}
}
// Engineer is the merge queue processor that polls for ready merge-requests
// and processes them according to the merge queue design.
type Engineer struct {
rig *rig.Rig
beads *beads.Beads
config *MergeQueueConfig
workDir string
// stopCh is used for graceful shutdown
stopCh chan struct{}
}
// NewEngineer creates a new Engineer for the given rig.
func NewEngineer(r *rig.Rig) *Engineer {
return &Engineer{
rig: r,
beads: beads.New(r.Path),
config: DefaultMergeQueueConfig(),
workDir: r.Path,
stopCh: make(chan struct{}),
}
}
// LoadConfig loads merge queue configuration from the rig's config.json.
func (e *Engineer) LoadConfig() error {
configPath := filepath.Join(e.rig.Path, "config.json")
data, err := os.ReadFile(configPath)
if err != nil {
if os.IsNotExist(err) {
// Use defaults if no config file
return nil
}
return fmt.Errorf("reading config: %w", err)
}
// Parse config file to extract merge_queue section
var rawConfig struct {
MergeQueue json.RawMessage `json:"merge_queue"`
}
if err := json.Unmarshal(data, &rawConfig); err != nil {
return fmt.Errorf("parsing config: %w", err)
}
if rawConfig.MergeQueue == nil {
// No merge_queue section, use defaults
return nil
}
// Parse merge_queue section into our config struct
// We need special handling for poll_interval (string -> Duration)
var mqRaw struct {
Enabled *bool `json:"enabled"`
TargetBranch *string `json:"target_branch"`
IntegrationBranches *bool `json:"integration_branches"`
OnConflict *string `json:"on_conflict"`
RunTests *bool `json:"run_tests"`
TestCommand *string `json:"test_command"`
DeleteMergedBranches *bool `json:"delete_merged_branches"`
RetryFlakyTests *int `json:"retry_flaky_tests"`
PollInterval *string `json:"poll_interval"`
MaxConcurrent *int `json:"max_concurrent"`
}
if err := json.Unmarshal(rawConfig.MergeQueue, &mqRaw); err != nil {
return fmt.Errorf("parsing merge_queue config: %w", err)
}
// Apply non-nil values to config (preserving defaults for missing fields)
if mqRaw.Enabled != nil {
e.config.Enabled = *mqRaw.Enabled
}
if mqRaw.TargetBranch != nil {
e.config.TargetBranch = *mqRaw.TargetBranch
}
if mqRaw.IntegrationBranches != nil {
e.config.IntegrationBranches = *mqRaw.IntegrationBranches
}
if mqRaw.OnConflict != nil {
e.config.OnConflict = *mqRaw.OnConflict
}
if mqRaw.RunTests != nil {
e.config.RunTests = *mqRaw.RunTests
}
if mqRaw.TestCommand != nil {
e.config.TestCommand = *mqRaw.TestCommand
}
if mqRaw.DeleteMergedBranches != nil {
e.config.DeleteMergedBranches = *mqRaw.DeleteMergedBranches
}
if mqRaw.RetryFlakyTests != nil {
e.config.RetryFlakyTests = *mqRaw.RetryFlakyTests
}
if mqRaw.MaxConcurrent != nil {
e.config.MaxConcurrent = *mqRaw.MaxConcurrent
}
if mqRaw.PollInterval != nil {
dur, err := time.ParseDuration(*mqRaw.PollInterval)
if err != nil {
return fmt.Errorf("invalid poll_interval %q: %w", *mqRaw.PollInterval, err)
}
e.config.PollInterval = dur
}
return nil
}
// Config returns the current merge queue configuration.
func (e *Engineer) Config() *MergeQueueConfig {
return e.config
}
// Run starts the Engineer main loop. It blocks until the context is cancelled
// or Stop() is called. Returns nil on graceful shutdown.
func (e *Engineer) Run(ctx context.Context) error {
if err := e.LoadConfig(); err != nil {
return fmt.Errorf("loading config: %w", err)
}
if !e.config.Enabled {
return fmt.Errorf("merge queue is disabled in configuration")
}
fmt.Printf("[Engineer] Starting for rig %s (poll_interval=%s)\n",
e.rig.Name, e.config.PollInterval)
ticker := time.NewTicker(e.config.PollInterval)
defer ticker.Stop()
// Run one iteration immediately, then on ticker
if err := e.processOnce(ctx); err != nil {
fmt.Printf("[Engineer] Error: %v\n", err)
}
for {
select {
case <-ctx.Done():
fmt.Println("[Engineer] Shutting down (context cancelled)")
return nil
case <-e.stopCh:
fmt.Println("[Engineer] Shutting down (stop signal)")
return nil
case <-ticker.C:
if err := e.processOnce(ctx); err != nil {
fmt.Printf("[Engineer] Error: %v\n", err)
}
}
}
}
// Stop signals the Engineer to stop processing. This is a non-blocking call.
func (e *Engineer) Stop() {
close(e.stopCh)
}
// processOnce performs one iteration of the Engineer loop:
// 1. Query for ready merge-requests
// 2. If none, return (will try again on next tick)
// 3. Process the highest priority, oldest MR
func (e *Engineer) processOnce(ctx context.Context) error {
// Check context before starting
select {
case <-ctx.Done():
return nil
default:
}
// 1. Query: bd ready --type=merge-request (filtered client-side)
readyMRs, err := e.beads.ReadyWithType("merge-request")
if err != nil {
return fmt.Errorf("querying ready merge-requests: %w", err)
}
// 2. If empty, return
if len(readyMRs) == 0 {
return nil
}
// 3. Select highest priority, oldest MR
// bd ready already returns sorted by priority then age, so first is best
mr := readyMRs[0]
fmt.Printf("[Engineer] Processing: %s (%s)\n", mr.ID, mr.Title)
// 4. Claim: bd update <id> --status=in_progress
inProgress := "in_progress"
if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &inProgress}); err != nil {
return fmt.Errorf("claiming MR %s: %w", mr.ID, err)
}
// 5. Process (delegate to ProcessMR - implementation in separate issue gt-3x1.2)
result := e.ProcessMR(ctx, mr)
// 6. Handle result
if result.Success {
// Close with merged reason
reason := fmt.Sprintf("merged: %s", result.MergeCommit)
if err := e.beads.CloseWithReason(reason, mr.ID); err != nil {
fmt.Printf("[Engineer] Warning: failed to close MR %s: %v\n", mr.ID, err)
}
fmt.Printf("[Engineer] ✓ Merged: %s\n", mr.ID)
} else {
// Failure handling (detailed implementation in gt-3x1.4)
e.handleFailure(mr, result)
}
return nil
}
// ProcessResult contains the result of processing a merge request.
type ProcessResult struct {
Success bool
MergeCommit string
Error string
Conflict bool
TestsFailed bool
}
// ProcessMR processes a single merge request.
// This is a placeholder that will be fully implemented in gt-3x1.2.
func (e *Engineer) ProcessMR(ctx context.Context, mr *beads.Issue) ProcessResult {
// Parse MR fields from description
mrFields := beads.ParseMRFields(mr)
if mrFields == nil {
return ProcessResult{
Success: false,
Error: "no MR fields found in description",
}
}
// For now, just log what we would do
// Full implementation in gt-3x1.2: Fetch and conflict check
fmt.Printf("[Engineer] Would process:\n")
fmt.Printf(" Branch: %s\n", mrFields.Branch)
fmt.Printf(" Target: %s\n", mrFields.Target)
fmt.Printf(" Worker: %s\n", mrFields.Worker)
// Return failure for now - actual implementation in gt-3x1.2
return ProcessResult{
Success: false,
Error: "ProcessMR not fully implemented (see gt-3x1.2)",
}
}
// handleFailure handles a failed merge request.
// This is a placeholder that will be fully implemented in gt-3x1.4.
func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) {
// Reopen the MR (back to open status for rework)
open := "open"
if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &open}); err != nil {
fmt.Printf("[Engineer] Warning: failed to reopen MR %s: %v\n", mr.ID, err)
}
// Log the failure
fmt.Printf("[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error)
// Full failure handling (assign back to worker, labels) in gt-3x1.4
}

View File

@@ -0,0 +1,209 @@
package refinery
import (
"encoding/json"
"os"
"path/filepath"
"testing"
"time"
"github.com/steveyegge/gastown/internal/rig"
)
func TestDefaultMergeQueueConfig(t *testing.T) {
cfg := DefaultMergeQueueConfig()
if !cfg.Enabled {
t.Error("expected Enabled to be true by default")
}
if cfg.TargetBranch != "main" {
t.Errorf("expected TargetBranch to be 'main', got %q", cfg.TargetBranch)
}
if cfg.PollInterval != 30*time.Second {
t.Errorf("expected PollInterval to be 30s, got %v", cfg.PollInterval)
}
if cfg.MaxConcurrent != 1 {
t.Errorf("expected MaxConcurrent to be 1, got %d", cfg.MaxConcurrent)
}
if cfg.OnConflict != "assign_back" {
t.Errorf("expected OnConflict to be 'assign_back', got %q", cfg.OnConflict)
}
}
func TestEngineer_LoadConfig_NoFile(t *testing.T) {
// Create a temp directory without config.json
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
// Should not error with missing config file
if err := e.LoadConfig(); err != nil {
t.Errorf("unexpected error with missing config: %v", err)
}
// Should use defaults
if e.config.PollInterval != 30*time.Second {
t.Errorf("expected default PollInterval, got %v", e.config.PollInterval)
}
}
func TestEngineer_LoadConfig_WithMergeQueue(t *testing.T) {
// Create a temp directory with config.json
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// Write config file
config := map[string]interface{}{
"type": "rig",
"version": 1,
"name": "test-rig",
"merge_queue": map[string]interface{}{
"enabled": true,
"target_branch": "develop",
"poll_interval": "10s",
"max_concurrent": 2,
"run_tests": false,
"test_command": "make test",
},
}
data, _ := json.MarshalIndent(config, "", " ")
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
t.Fatal(err)
}
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
if err := e.LoadConfig(); err != nil {
t.Errorf("unexpected error loading config: %v", err)
}
// Check that config values were loaded
if e.config.TargetBranch != "develop" {
t.Errorf("expected TargetBranch 'develop', got %q", e.config.TargetBranch)
}
if e.config.PollInterval != 10*time.Second {
t.Errorf("expected PollInterval 10s, got %v", e.config.PollInterval)
}
if e.config.MaxConcurrent != 2 {
t.Errorf("expected MaxConcurrent 2, got %d", e.config.MaxConcurrent)
}
if e.config.RunTests != false {
t.Errorf("expected RunTests false, got %v", e.config.RunTests)
}
if e.config.TestCommand != "make test" {
t.Errorf("expected TestCommand 'make test', got %q", e.config.TestCommand)
}
// Check that defaults are preserved for unspecified fields
if e.config.OnConflict != "assign_back" {
t.Errorf("expected OnConflict default 'assign_back', got %q", e.config.OnConflict)
}
}
func TestEngineer_LoadConfig_NoMergeQueueSection(t *testing.T) {
// Create a temp directory with config.json without merge_queue
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
// Write config file without merge_queue
config := map[string]interface{}{
"type": "rig",
"version": 1,
"name": "test-rig",
}
data, _ := json.MarshalIndent(config, "", " ")
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
t.Fatal(err)
}
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
if err := e.LoadConfig(); err != nil {
t.Errorf("unexpected error loading config: %v", err)
}
// Should use all defaults
if e.config.PollInterval != 30*time.Second {
t.Errorf("expected default PollInterval, got %v", e.config.PollInterval)
}
}
func TestEngineer_LoadConfig_InvalidPollInterval(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
config := map[string]interface{}{
"merge_queue": map[string]interface{}{
"poll_interval": "not-a-duration",
},
}
data, _ := json.MarshalIndent(config, "", " ")
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
t.Fatal(err)
}
r := &rig.Rig{
Name: "test-rig",
Path: tmpDir,
}
e := NewEngineer(r)
err = e.LoadConfig()
if err == nil {
t.Error("expected error for invalid poll_interval")
}
}
func TestNewEngineer(t *testing.T) {
r := &rig.Rig{
Name: "test-rig",
Path: "/tmp/test-rig",
}
e := NewEngineer(r)
if e.rig != r {
t.Error("expected rig to be set")
}
if e.beads == nil {
t.Error("expected beads client to be initialized")
}
if e.config == nil {
t.Error("expected config to be initialized with defaults")
}
if e.stopCh == nil {
t.Error("expected stopCh to be initialized")
}
}