From 7c2780fcd1507f7c858f93225570a8d8c782cbc1 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Thu, 18 Dec 2025 20:14:10 -0800 Subject: [PATCH] feat: implement Engineer main loop for merge queue processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the Engineer component that polls for ready merge-requests and processes them according to the merge queue design. Features: - Main loop that queries `bd ready` for merge-request type issues - Configurable poll_interval and max_concurrent from rig config.json - Graceful shutdown via context cancellation or Stop() method - Claims MRs via `bd update --status=in_progress` before processing - Handles success/failure with appropriate status updates Configuration (in rig config.json merge_queue section): - poll_interval: duration string (default "30s") - max_concurrent: number (default 1) - enabled, target_branch, run_tests, test_command, etc. Also adds ReadyWithType() to beads package for type-filtered queries. Note: ProcessMR() and handleFailure() are placeholders - full implementation in gt-3x1.2 and gt-3x1.4. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/beads/beads.go | 20 ++ internal/refinery/engineer.go | 321 +++++++++++++++++++++++++++++ internal/refinery/engineer_test.go | 209 +++++++++++++++++++ 3 files changed, 550 insertions(+) create mode 100644 internal/refinery/engineer.go create mode 100644 internal/refinery/engineer_test.go diff --git a/internal/beads/beads.go b/internal/beads/beads.go index cde267f1..043277c8 100644 --- a/internal/beads/beads.go +++ b/internal/beads/beads.go @@ -190,6 +190,26 @@ func (b *Beads) Ready() ([]*Issue, error) { return issues, nil } +// ReadyWithType returns ready issues filtered by type. +// This fetches all ready issues and filters client-side by type. +// Issues are returned sorted by priority (lowest first) then by creation time (oldest first). +func (b *Beads) ReadyWithType(issueType string) ([]*Issue, error) { + issues, err := b.Ready() + if err != nil { + return nil, err + } + + // Filter by type + var filtered []*Issue + for _, issue := range issues { + if issue.Type == issueType { + filtered = append(filtered, issue) + } + } + + return filtered, nil +} + // Show returns detailed information about an issue. func (b *Beads) Show(id string) (*Issue, error) { out, err := b.run("show", id, "--json") diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go new file mode 100644 index 00000000..5eea4b81 --- /dev/null +++ b/internal/refinery/engineer.go @@ -0,0 +1,321 @@ +// Package refinery provides the merge queue processing agent. +package refinery + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/steveyegge/gastown/internal/beads" + "github.com/steveyegge/gastown/internal/rig" +) + +// MergeQueueConfig holds configuration for the merge queue processor. +type MergeQueueConfig struct { + // Enabled controls whether the merge queue is active. + Enabled bool `json:"enabled"` + + // TargetBranch is the default branch to merge to (e.g., "main"). + TargetBranch string `json:"target_branch"` + + // IntegrationBranches enables per-epic integration branches. + IntegrationBranches bool `json:"integration_branches"` + + // OnConflict is the strategy for handling conflicts: "assign_back" or "auto_rebase". + OnConflict string `json:"on_conflict"` + + // RunTests controls whether to run tests before merging. + RunTests bool `json:"run_tests"` + + // TestCommand is the command to run for testing. + TestCommand string `json:"test_command"` + + // DeleteMergedBranches controls whether to delete branches after merge. + DeleteMergedBranches bool `json:"delete_merged_branches"` + + // RetryFlakyTests is the number of times to retry flaky tests. + RetryFlakyTests int `json:"retry_flaky_tests"` + + // PollInterval is how often to check for new MRs. + PollInterval time.Duration `json:"poll_interval"` + + // MaxConcurrent is the maximum number of MRs to process concurrently. + MaxConcurrent int `json:"max_concurrent"` +} + +// DefaultMergeQueueConfig returns sensible defaults for merge queue configuration. +func DefaultMergeQueueConfig() *MergeQueueConfig { + return &MergeQueueConfig{ + Enabled: true, + TargetBranch: "main", + IntegrationBranches: true, + OnConflict: "assign_back", + RunTests: true, + TestCommand: "", + DeleteMergedBranches: true, + RetryFlakyTests: 1, + PollInterval: 30 * time.Second, + MaxConcurrent: 1, + } +} + +// Engineer is the merge queue processor that polls for ready merge-requests +// and processes them according to the merge queue design. +type Engineer struct { + rig *rig.Rig + beads *beads.Beads + config *MergeQueueConfig + workDir string + + // stopCh is used for graceful shutdown + stopCh chan struct{} +} + +// NewEngineer creates a new Engineer for the given rig. +func NewEngineer(r *rig.Rig) *Engineer { + return &Engineer{ + rig: r, + beads: beads.New(r.Path), + config: DefaultMergeQueueConfig(), + workDir: r.Path, + stopCh: make(chan struct{}), + } +} + +// LoadConfig loads merge queue configuration from the rig's config.json. +func (e *Engineer) LoadConfig() error { + configPath := filepath.Join(e.rig.Path, "config.json") + data, err := os.ReadFile(configPath) + if err != nil { + if os.IsNotExist(err) { + // Use defaults if no config file + return nil + } + return fmt.Errorf("reading config: %w", err) + } + + // Parse config file to extract merge_queue section + var rawConfig struct { + MergeQueue json.RawMessage `json:"merge_queue"` + } + if err := json.Unmarshal(data, &rawConfig); err != nil { + return fmt.Errorf("parsing config: %w", err) + } + + if rawConfig.MergeQueue == nil { + // No merge_queue section, use defaults + return nil + } + + // Parse merge_queue section into our config struct + // We need special handling for poll_interval (string -> Duration) + var mqRaw struct { + Enabled *bool `json:"enabled"` + TargetBranch *string `json:"target_branch"` + IntegrationBranches *bool `json:"integration_branches"` + OnConflict *string `json:"on_conflict"` + RunTests *bool `json:"run_tests"` + TestCommand *string `json:"test_command"` + DeleteMergedBranches *bool `json:"delete_merged_branches"` + RetryFlakyTests *int `json:"retry_flaky_tests"` + PollInterval *string `json:"poll_interval"` + MaxConcurrent *int `json:"max_concurrent"` + } + + if err := json.Unmarshal(rawConfig.MergeQueue, &mqRaw); err != nil { + return fmt.Errorf("parsing merge_queue config: %w", err) + } + + // Apply non-nil values to config (preserving defaults for missing fields) + if mqRaw.Enabled != nil { + e.config.Enabled = *mqRaw.Enabled + } + if mqRaw.TargetBranch != nil { + e.config.TargetBranch = *mqRaw.TargetBranch + } + if mqRaw.IntegrationBranches != nil { + e.config.IntegrationBranches = *mqRaw.IntegrationBranches + } + if mqRaw.OnConflict != nil { + e.config.OnConflict = *mqRaw.OnConflict + } + if mqRaw.RunTests != nil { + e.config.RunTests = *mqRaw.RunTests + } + if mqRaw.TestCommand != nil { + e.config.TestCommand = *mqRaw.TestCommand + } + if mqRaw.DeleteMergedBranches != nil { + e.config.DeleteMergedBranches = *mqRaw.DeleteMergedBranches + } + if mqRaw.RetryFlakyTests != nil { + e.config.RetryFlakyTests = *mqRaw.RetryFlakyTests + } + if mqRaw.MaxConcurrent != nil { + e.config.MaxConcurrent = *mqRaw.MaxConcurrent + } + if mqRaw.PollInterval != nil { + dur, err := time.ParseDuration(*mqRaw.PollInterval) + if err != nil { + return fmt.Errorf("invalid poll_interval %q: %w", *mqRaw.PollInterval, err) + } + e.config.PollInterval = dur + } + + return nil +} + +// Config returns the current merge queue configuration. +func (e *Engineer) Config() *MergeQueueConfig { + return e.config +} + +// Run starts the Engineer main loop. It blocks until the context is cancelled +// or Stop() is called. Returns nil on graceful shutdown. +func (e *Engineer) Run(ctx context.Context) error { + if err := e.LoadConfig(); err != nil { + return fmt.Errorf("loading config: %w", err) + } + + if !e.config.Enabled { + return fmt.Errorf("merge queue is disabled in configuration") + } + + fmt.Printf("[Engineer] Starting for rig %s (poll_interval=%s)\n", + e.rig.Name, e.config.PollInterval) + + ticker := time.NewTicker(e.config.PollInterval) + defer ticker.Stop() + + // Run one iteration immediately, then on ticker + if err := e.processOnce(ctx); err != nil { + fmt.Printf("[Engineer] Error: %v\n", err) + } + + for { + select { + case <-ctx.Done(): + fmt.Println("[Engineer] Shutting down (context cancelled)") + return nil + case <-e.stopCh: + fmt.Println("[Engineer] Shutting down (stop signal)") + return nil + case <-ticker.C: + if err := e.processOnce(ctx); err != nil { + fmt.Printf("[Engineer] Error: %v\n", err) + } + } + } +} + +// Stop signals the Engineer to stop processing. This is a non-blocking call. +func (e *Engineer) Stop() { + close(e.stopCh) +} + +// processOnce performs one iteration of the Engineer loop: +// 1. Query for ready merge-requests +// 2. If none, return (will try again on next tick) +// 3. Process the highest priority, oldest MR +func (e *Engineer) processOnce(ctx context.Context) error { + // Check context before starting + select { + case <-ctx.Done(): + return nil + default: + } + + // 1. Query: bd ready --type=merge-request (filtered client-side) + readyMRs, err := e.beads.ReadyWithType("merge-request") + if err != nil { + return fmt.Errorf("querying ready merge-requests: %w", err) + } + + // 2. If empty, return + if len(readyMRs) == 0 { + return nil + } + + // 3. Select highest priority, oldest MR + // bd ready already returns sorted by priority then age, so first is best + mr := readyMRs[0] + + fmt.Printf("[Engineer] Processing: %s (%s)\n", mr.ID, mr.Title) + + // 4. Claim: bd update --status=in_progress + inProgress := "in_progress" + if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &inProgress}); err != nil { + return fmt.Errorf("claiming MR %s: %w", mr.ID, err) + } + + // 5. Process (delegate to ProcessMR - implementation in separate issue gt-3x1.2) + result := e.ProcessMR(ctx, mr) + + // 6. Handle result + if result.Success { + // Close with merged reason + reason := fmt.Sprintf("merged: %s", result.MergeCommit) + if err := e.beads.CloseWithReason(reason, mr.ID); err != nil { + fmt.Printf("[Engineer] Warning: failed to close MR %s: %v\n", mr.ID, err) + } + fmt.Printf("[Engineer] ✓ Merged: %s\n", mr.ID) + } else { + // Failure handling (detailed implementation in gt-3x1.4) + e.handleFailure(mr, result) + } + + return nil +} + +// ProcessResult contains the result of processing a merge request. +type ProcessResult struct { + Success bool + MergeCommit string + Error string + Conflict bool + TestsFailed bool +} + +// ProcessMR processes a single merge request. +// This is a placeholder that will be fully implemented in gt-3x1.2. +func (e *Engineer) ProcessMR(ctx context.Context, mr *beads.Issue) ProcessResult { + // Parse MR fields from description + mrFields := beads.ParseMRFields(mr) + if mrFields == nil { + return ProcessResult{ + Success: false, + Error: "no MR fields found in description", + } + } + + // For now, just log what we would do + // Full implementation in gt-3x1.2: Fetch and conflict check + fmt.Printf("[Engineer] Would process:\n") + fmt.Printf(" Branch: %s\n", mrFields.Branch) + fmt.Printf(" Target: %s\n", mrFields.Target) + fmt.Printf(" Worker: %s\n", mrFields.Worker) + + // Return failure for now - actual implementation in gt-3x1.2 + return ProcessResult{ + Success: false, + Error: "ProcessMR not fully implemented (see gt-3x1.2)", + } +} + +// handleFailure handles a failed merge request. +// This is a placeholder that will be fully implemented in gt-3x1.4. +func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) { + // Reopen the MR (back to open status for rework) + open := "open" + if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &open}); err != nil { + fmt.Printf("[Engineer] Warning: failed to reopen MR %s: %v\n", mr.ID, err) + } + + // Log the failure + fmt.Printf("[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error) + + // Full failure handling (assign back to worker, labels) in gt-3x1.4 +} diff --git a/internal/refinery/engineer_test.go b/internal/refinery/engineer_test.go new file mode 100644 index 00000000..80ac1429 --- /dev/null +++ b/internal/refinery/engineer_test.go @@ -0,0 +1,209 @@ +package refinery + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/steveyegge/gastown/internal/rig" +) + +func TestDefaultMergeQueueConfig(t *testing.T) { + cfg := DefaultMergeQueueConfig() + + if !cfg.Enabled { + t.Error("expected Enabled to be true by default") + } + if cfg.TargetBranch != "main" { + t.Errorf("expected TargetBranch to be 'main', got %q", cfg.TargetBranch) + } + if cfg.PollInterval != 30*time.Second { + t.Errorf("expected PollInterval to be 30s, got %v", cfg.PollInterval) + } + if cfg.MaxConcurrent != 1 { + t.Errorf("expected MaxConcurrent to be 1, got %d", cfg.MaxConcurrent) + } + if cfg.OnConflict != "assign_back" { + t.Errorf("expected OnConflict to be 'assign_back', got %q", cfg.OnConflict) + } +} + +func TestEngineer_LoadConfig_NoFile(t *testing.T) { + // Create a temp directory without config.json + tmpDir, err := os.MkdirTemp("", "engineer-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + r := &rig.Rig{ + Name: "test-rig", + Path: tmpDir, + } + + e := NewEngineer(r) + + // Should not error with missing config file + if err := e.LoadConfig(); err != nil { + t.Errorf("unexpected error with missing config: %v", err) + } + + // Should use defaults + if e.config.PollInterval != 30*time.Second { + t.Errorf("expected default PollInterval, got %v", e.config.PollInterval) + } +} + +func TestEngineer_LoadConfig_WithMergeQueue(t *testing.T) { + // Create a temp directory with config.json + tmpDir, err := os.MkdirTemp("", "engineer-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // Write config file + config := map[string]interface{}{ + "type": "rig", + "version": 1, + "name": "test-rig", + "merge_queue": map[string]interface{}{ + "enabled": true, + "target_branch": "develop", + "poll_interval": "10s", + "max_concurrent": 2, + "run_tests": false, + "test_command": "make test", + }, + } + + data, _ := json.MarshalIndent(config, "", " ") + if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil { + t.Fatal(err) + } + + r := &rig.Rig{ + Name: "test-rig", + Path: tmpDir, + } + + e := NewEngineer(r) + + if err := e.LoadConfig(); err != nil { + t.Errorf("unexpected error loading config: %v", err) + } + + // Check that config values were loaded + if e.config.TargetBranch != "develop" { + t.Errorf("expected TargetBranch 'develop', got %q", e.config.TargetBranch) + } + if e.config.PollInterval != 10*time.Second { + t.Errorf("expected PollInterval 10s, got %v", e.config.PollInterval) + } + if e.config.MaxConcurrent != 2 { + t.Errorf("expected MaxConcurrent 2, got %d", e.config.MaxConcurrent) + } + if e.config.RunTests != false { + t.Errorf("expected RunTests false, got %v", e.config.RunTests) + } + if e.config.TestCommand != "make test" { + t.Errorf("expected TestCommand 'make test', got %q", e.config.TestCommand) + } + + // Check that defaults are preserved for unspecified fields + if e.config.OnConflict != "assign_back" { + t.Errorf("expected OnConflict default 'assign_back', got %q", e.config.OnConflict) + } +} + +func TestEngineer_LoadConfig_NoMergeQueueSection(t *testing.T) { + // Create a temp directory with config.json without merge_queue + tmpDir, err := os.MkdirTemp("", "engineer-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + // Write config file without merge_queue + config := map[string]interface{}{ + "type": "rig", + "version": 1, + "name": "test-rig", + } + + data, _ := json.MarshalIndent(config, "", " ") + if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil { + t.Fatal(err) + } + + r := &rig.Rig{ + Name: "test-rig", + Path: tmpDir, + } + + e := NewEngineer(r) + + if err := e.LoadConfig(); err != nil { + t.Errorf("unexpected error loading config: %v", err) + } + + // Should use all defaults + if e.config.PollInterval != 30*time.Second { + t.Errorf("expected default PollInterval, got %v", e.config.PollInterval) + } +} + +func TestEngineer_LoadConfig_InvalidPollInterval(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "engineer-test-*") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + config := map[string]interface{}{ + "merge_queue": map[string]interface{}{ + "poll_interval": "not-a-duration", + }, + } + + data, _ := json.MarshalIndent(config, "", " ") + if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil { + t.Fatal(err) + } + + r := &rig.Rig{ + Name: "test-rig", + Path: tmpDir, + } + + e := NewEngineer(r) + + err = e.LoadConfig() + if err == nil { + t.Error("expected error for invalid poll_interval") + } +} + +func TestNewEngineer(t *testing.T) { + r := &rig.Rig{ + Name: "test-rig", + Path: "/tmp/test-rig", + } + + e := NewEngineer(r) + + if e.rig != r { + t.Error("expected rig to be set") + } + if e.beads == nil { + t.Error("expected beads client to be initialized") + } + if e.config == nil { + t.Error("expected config to be initialized with defaults") + } + if e.stopCh == nil { + t.Error("expected stopCh to be initialized") + } +}