Merge polecat/Coma: gt mq status + Engineer main loop
This commit is contained in:
@@ -190,6 +190,26 @@ func (b *Beads) Ready() ([]*Issue, error) {
|
||||
return issues, nil
|
||||
}
|
||||
|
||||
// ReadyWithType returns ready issues filtered by type.
|
||||
// This fetches all ready issues and filters client-side by type.
|
||||
// Issues are returned sorted by priority (lowest first) then by creation time (oldest first).
|
||||
func (b *Beads) ReadyWithType(issueType string) ([]*Issue, error) {
|
||||
issues, err := b.Ready()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Filter by type
|
||||
var filtered []*Issue
|
||||
for _, issue := range issues {
|
||||
if issue.Type == issueType {
|
||||
filtered = append(filtered, issue)
|
||||
}
|
||||
}
|
||||
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
// Show returns detailed information about an issue.
|
||||
func (b *Beads) Show(id string) (*Issue, error) {
|
||||
out, err := b.run("show", id, "--json")
|
||||
|
||||
@@ -40,6 +40,9 @@ var (
|
||||
mqListWorker string
|
||||
mqListEpic string
|
||||
mqListJSON bool
|
||||
|
||||
// Status command flags
|
||||
mqStatusJSON bool
|
||||
)
|
||||
|
||||
var mqCmd = &cobra.Command{
|
||||
@@ -127,6 +130,20 @@ Examples:
|
||||
RunE: runMQReject,
|
||||
}
|
||||
|
||||
var mqStatusCmd = &cobra.Command{
|
||||
Use: "status <id>",
|
||||
Short: "Show detailed merge request status",
|
||||
Long: `Display detailed information about a merge request.
|
||||
|
||||
Shows all MR fields, current status with timestamps, dependencies,
|
||||
blockers, and processing history.
|
||||
|
||||
Example:
|
||||
gt mq status gt-mr-abc123`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: runMqStatus,
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Submit flags
|
||||
mqSubmitCmd.Flags().StringVar(&mqSubmitBranch, "branch", "", "Source branch (default: current branch)")
|
||||
@@ -149,11 +166,15 @@ func init() {
|
||||
mqRejectCmd.Flags().BoolVar(&mqRejectNotify, "notify", false, "Send mail notification to worker")
|
||||
_ = mqRejectCmd.MarkFlagRequired("reason")
|
||||
|
||||
// Status flags
|
||||
mqStatusCmd.Flags().BoolVar(&mqStatusJSON, "json", false, "Output as JSON")
|
||||
|
||||
// Add subcommands
|
||||
mqCmd.AddCommand(mqSubmitCmd)
|
||||
mqCmd.AddCommand(mqRetryCmd)
|
||||
mqCmd.AddCommand(mqListCmd)
|
||||
mqCmd.AddCommand(mqRejectCmd)
|
||||
mqCmd.AddCommand(mqStatusCmd)
|
||||
|
||||
rootCmd.AddCommand(mqCmd)
|
||||
}
|
||||
@@ -611,3 +632,347 @@ func runMQReject(cmd *cobra.Command, args []string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MRStatusOutput is the JSON output structure for gt mq status.
|
||||
type MRStatusOutput struct {
|
||||
// Core issue fields
|
||||
ID string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Status string `json:"status"`
|
||||
Priority int `json:"priority"`
|
||||
Type string `json:"type"`
|
||||
Assignee string `json:"assignee,omitempty"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
ClosedAt string `json:"closed_at,omitempty"`
|
||||
|
||||
// MR-specific fields
|
||||
Branch string `json:"branch,omitempty"`
|
||||
Target string `json:"target,omitempty"`
|
||||
SourceIssue string `json:"source_issue,omitempty"`
|
||||
Worker string `json:"worker,omitempty"`
|
||||
Rig string `json:"rig,omitempty"`
|
||||
MergeCommit string `json:"merge_commit,omitempty"`
|
||||
CloseReason string `json:"close_reason,omitempty"`
|
||||
|
||||
// Dependencies
|
||||
DependsOn []DependencyInfo `json:"depends_on,omitempty"`
|
||||
Blocks []DependencyInfo `json:"blocks,omitempty"`
|
||||
}
|
||||
|
||||
// DependencyInfo represents a dependency or blocker.
|
||||
type DependencyInfo struct {
|
||||
ID string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Status string `json:"status"`
|
||||
Priority int `json:"priority"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
func runMqStatus(cmd *cobra.Command, args []string) error {
|
||||
mrID := args[0]
|
||||
|
||||
// Use current working directory for beads operations
|
||||
// (beads repos are per-rig, not per-workspace)
|
||||
workDir, err := os.Getwd()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting current directory: %w", err)
|
||||
}
|
||||
|
||||
// Initialize beads client
|
||||
bd := beads.New(workDir)
|
||||
|
||||
// Fetch the issue
|
||||
issue, err := bd.Show(mrID)
|
||||
if err != nil {
|
||||
if err == beads.ErrNotFound {
|
||||
return fmt.Errorf("merge request '%s' not found", mrID)
|
||||
}
|
||||
return fmt.Errorf("fetching merge request: %w", err)
|
||||
}
|
||||
|
||||
// Parse MR-specific fields from description
|
||||
mrFields := beads.ParseMRFields(issue)
|
||||
|
||||
// Build output structure
|
||||
output := MRStatusOutput{
|
||||
ID: issue.ID,
|
||||
Title: issue.Title,
|
||||
Status: issue.Status,
|
||||
Priority: issue.Priority,
|
||||
Type: issue.Type,
|
||||
Assignee: issue.Assignee,
|
||||
CreatedAt: issue.CreatedAt,
|
||||
UpdatedAt: issue.UpdatedAt,
|
||||
ClosedAt: issue.ClosedAt,
|
||||
}
|
||||
|
||||
// Add MR fields if present
|
||||
if mrFields != nil {
|
||||
output.Branch = mrFields.Branch
|
||||
output.Target = mrFields.Target
|
||||
output.SourceIssue = mrFields.SourceIssue
|
||||
output.Worker = mrFields.Worker
|
||||
output.Rig = mrFields.Rig
|
||||
output.MergeCommit = mrFields.MergeCommit
|
||||
output.CloseReason = mrFields.CloseReason
|
||||
}
|
||||
|
||||
// Add dependency info from the issue's Dependencies field
|
||||
for _, dep := range issue.Dependencies {
|
||||
output.DependsOn = append(output.DependsOn, DependencyInfo{
|
||||
ID: dep.ID,
|
||||
Title: dep.Title,
|
||||
Status: dep.Status,
|
||||
Priority: dep.Priority,
|
||||
Type: dep.Type,
|
||||
})
|
||||
}
|
||||
|
||||
// Add blocker info from the issue's Dependents field
|
||||
for _, dep := range issue.Dependents {
|
||||
output.Blocks = append(output.Blocks, DependencyInfo{
|
||||
ID: dep.ID,
|
||||
Title: dep.Title,
|
||||
Status: dep.Status,
|
||||
Priority: dep.Priority,
|
||||
Type: dep.Type,
|
||||
})
|
||||
}
|
||||
|
||||
// JSON output
|
||||
if mqStatusJSON {
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(output)
|
||||
}
|
||||
|
||||
// Human-readable output
|
||||
return printMqStatus(issue, mrFields)
|
||||
}
|
||||
|
||||
// printMqStatus prints detailed MR status in human-readable format.
|
||||
func printMqStatus(issue *beads.Issue, mrFields *beads.MRFields) error {
|
||||
// Header
|
||||
fmt.Printf("%s %s\n", style.Bold.Render("📋 Merge Request:"), issue.ID)
|
||||
fmt.Printf(" %s\n\n", issue.Title)
|
||||
|
||||
// Status section
|
||||
fmt.Printf("%s\n", style.Bold.Render("Status"))
|
||||
statusDisplay := formatStatus(issue.Status)
|
||||
fmt.Printf(" State: %s\n", statusDisplay)
|
||||
fmt.Printf(" Priority: P%d\n", issue.Priority)
|
||||
if issue.Type != "" {
|
||||
fmt.Printf(" Type: %s\n", issue.Type)
|
||||
}
|
||||
if issue.Assignee != "" {
|
||||
fmt.Printf(" Assignee: %s\n", issue.Assignee)
|
||||
}
|
||||
|
||||
// Timestamps
|
||||
fmt.Printf("\n%s\n", style.Bold.Render("Timeline"))
|
||||
if issue.CreatedAt != "" {
|
||||
fmt.Printf(" Created: %s %s\n", issue.CreatedAt, formatTimeAgo(issue.CreatedAt))
|
||||
}
|
||||
if issue.UpdatedAt != "" && issue.UpdatedAt != issue.CreatedAt {
|
||||
fmt.Printf(" Updated: %s %s\n", issue.UpdatedAt, formatTimeAgo(issue.UpdatedAt))
|
||||
}
|
||||
if issue.ClosedAt != "" {
|
||||
fmt.Printf(" Closed: %s %s\n", issue.ClosedAt, formatTimeAgo(issue.ClosedAt))
|
||||
}
|
||||
|
||||
// MR-specific fields
|
||||
if mrFields != nil {
|
||||
fmt.Printf("\n%s\n", style.Bold.Render("Merge Details"))
|
||||
if mrFields.Branch != "" {
|
||||
fmt.Printf(" Branch: %s\n", mrFields.Branch)
|
||||
}
|
||||
if mrFields.Target != "" {
|
||||
fmt.Printf(" Target: %s\n", mrFields.Target)
|
||||
}
|
||||
if mrFields.SourceIssue != "" {
|
||||
fmt.Printf(" Source Issue: %s\n", mrFields.SourceIssue)
|
||||
}
|
||||
if mrFields.Worker != "" {
|
||||
fmt.Printf(" Worker: %s\n", mrFields.Worker)
|
||||
}
|
||||
if mrFields.Rig != "" {
|
||||
fmt.Printf(" Rig: %s\n", mrFields.Rig)
|
||||
}
|
||||
if mrFields.MergeCommit != "" {
|
||||
fmt.Printf(" Merge Commit: %s\n", mrFields.MergeCommit)
|
||||
}
|
||||
if mrFields.CloseReason != "" {
|
||||
fmt.Printf(" Close Reason: %s\n", mrFields.CloseReason)
|
||||
}
|
||||
}
|
||||
|
||||
// Dependencies (what this MR is waiting on)
|
||||
if len(issue.Dependencies) > 0 {
|
||||
fmt.Printf("\n%s\n", style.Bold.Render("Waiting On"))
|
||||
for _, dep := range issue.Dependencies {
|
||||
statusIcon := getStatusIcon(dep.Status)
|
||||
fmt.Printf(" %s %s: %s %s\n",
|
||||
statusIcon,
|
||||
dep.ID,
|
||||
truncateString(dep.Title, 50),
|
||||
style.Dim.Render(fmt.Sprintf("[%s]", dep.Status)))
|
||||
}
|
||||
}
|
||||
|
||||
// Blockers (what's waiting on this MR)
|
||||
if len(issue.Dependents) > 0 {
|
||||
fmt.Printf("\n%s\n", style.Bold.Render("Blocking"))
|
||||
for _, dep := range issue.Dependents {
|
||||
statusIcon := getStatusIcon(dep.Status)
|
||||
fmt.Printf(" %s %s: %s %s\n",
|
||||
statusIcon,
|
||||
dep.ID,
|
||||
truncateString(dep.Title, 50),
|
||||
style.Dim.Render(fmt.Sprintf("[%s]", dep.Status)))
|
||||
}
|
||||
}
|
||||
|
||||
// Description (if present and not just MR fields)
|
||||
desc := getDescriptionWithoutMRFields(issue.Description)
|
||||
if desc != "" {
|
||||
fmt.Printf("\n%s\n", style.Bold.Render("Notes"))
|
||||
// Indent each line
|
||||
for _, line := range strings.Split(desc, "\n") {
|
||||
fmt.Printf(" %s\n", line)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// formatStatus formats the status with appropriate styling.
|
||||
func formatStatus(status string) string {
|
||||
switch status {
|
||||
case "open":
|
||||
return style.Info.Render("● open")
|
||||
case "in_progress":
|
||||
return style.Bold.Render("▶ in_progress")
|
||||
case "closed":
|
||||
return style.Dim.Render("✓ closed")
|
||||
default:
|
||||
return status
|
||||
}
|
||||
}
|
||||
|
||||
// getStatusIcon returns an icon for the given status.
|
||||
func getStatusIcon(status string) string {
|
||||
switch status {
|
||||
case "open":
|
||||
return "○"
|
||||
case "in_progress":
|
||||
return "▶"
|
||||
case "closed":
|
||||
return "✓"
|
||||
default:
|
||||
return "•"
|
||||
}
|
||||
}
|
||||
|
||||
// formatTimeAgo formats a timestamp as a relative time string.
|
||||
func formatTimeAgo(timestamp string) string {
|
||||
// Try parsing common formats
|
||||
formats := []string{
|
||||
time.RFC3339,
|
||||
"2006-01-02T15:04:05Z",
|
||||
"2006-01-02T15:04:05",
|
||||
"2006-01-02 15:04:05",
|
||||
"2006-01-02",
|
||||
}
|
||||
|
||||
var t time.Time
|
||||
var err error
|
||||
for _, format := range formats {
|
||||
t, err = time.Parse(format, timestamp)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return "" // Can't parse, return empty
|
||||
}
|
||||
|
||||
d := time.Since(t)
|
||||
if d < 0 {
|
||||
return style.Dim.Render("(in the future)")
|
||||
}
|
||||
|
||||
var ago string
|
||||
if d < time.Minute {
|
||||
ago = fmt.Sprintf("%ds ago", int(d.Seconds()))
|
||||
} else if d < time.Hour {
|
||||
ago = fmt.Sprintf("%dm ago", int(d.Minutes()))
|
||||
} else if d < 24*time.Hour {
|
||||
ago = fmt.Sprintf("%dh ago", int(d.Hours()))
|
||||
} else {
|
||||
ago = fmt.Sprintf("%dd ago", int(d.Hours()/24))
|
||||
}
|
||||
|
||||
return style.Dim.Render("(" + ago + ")")
|
||||
}
|
||||
|
||||
// truncateString truncates a string to maxLen, adding "..." if truncated.
|
||||
func truncateString(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
}
|
||||
if maxLen <= 3 {
|
||||
return s[:maxLen]
|
||||
}
|
||||
return s[:maxLen-3] + "..."
|
||||
}
|
||||
|
||||
// getDescriptionWithoutMRFields returns the description with MR field lines removed.
|
||||
func getDescriptionWithoutMRFields(description string) string {
|
||||
if description == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Known MR field keys (lowercase)
|
||||
mrKeys := map[string]bool{
|
||||
"branch": true,
|
||||
"target": true,
|
||||
"source_issue": true,
|
||||
"source-issue": true,
|
||||
"sourceissue": true,
|
||||
"worker": true,
|
||||
"rig": true,
|
||||
"merge_commit": true,
|
||||
"merge-commit": true,
|
||||
"mergecommit": true,
|
||||
"close_reason": true,
|
||||
"close-reason": true,
|
||||
"closereason": true,
|
||||
"type": true,
|
||||
}
|
||||
|
||||
var lines []string
|
||||
for _, line := range strings.Split(description, "\n") {
|
||||
trimmed := strings.TrimSpace(line)
|
||||
if trimmed == "" {
|
||||
lines = append(lines, line)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this is an MR field line
|
||||
colonIdx := strings.Index(trimmed, ":")
|
||||
if colonIdx != -1 {
|
||||
key := strings.ToLower(strings.TrimSpace(trimmed[:colonIdx]))
|
||||
if mrKeys[key] {
|
||||
continue // Skip MR field lines
|
||||
}
|
||||
}
|
||||
|
||||
lines = append(lines, line)
|
||||
}
|
||||
|
||||
// Trim leading/trailing blank lines
|
||||
result := strings.Join(lines, "\n")
|
||||
result = strings.TrimSpace(result)
|
||||
return result
|
||||
}
|
||||
|
||||
321
internal/refinery/engineer.go
Normal file
321
internal/refinery/engineer.go
Normal file
@@ -0,0 +1,321 @@
|
||||
// Package refinery provides the merge queue processing agent.
|
||||
package refinery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/gastown/internal/beads"
|
||||
"github.com/steveyegge/gastown/internal/rig"
|
||||
)
|
||||
|
||||
// MergeQueueConfig holds configuration for the merge queue processor.
|
||||
type MergeQueueConfig struct {
|
||||
// Enabled controls whether the merge queue is active.
|
||||
Enabled bool `json:"enabled"`
|
||||
|
||||
// TargetBranch is the default branch to merge to (e.g., "main").
|
||||
TargetBranch string `json:"target_branch"`
|
||||
|
||||
// IntegrationBranches enables per-epic integration branches.
|
||||
IntegrationBranches bool `json:"integration_branches"`
|
||||
|
||||
// OnConflict is the strategy for handling conflicts: "assign_back" or "auto_rebase".
|
||||
OnConflict string `json:"on_conflict"`
|
||||
|
||||
// RunTests controls whether to run tests before merging.
|
||||
RunTests bool `json:"run_tests"`
|
||||
|
||||
// TestCommand is the command to run for testing.
|
||||
TestCommand string `json:"test_command"`
|
||||
|
||||
// DeleteMergedBranches controls whether to delete branches after merge.
|
||||
DeleteMergedBranches bool `json:"delete_merged_branches"`
|
||||
|
||||
// RetryFlakyTests is the number of times to retry flaky tests.
|
||||
RetryFlakyTests int `json:"retry_flaky_tests"`
|
||||
|
||||
// PollInterval is how often to check for new MRs.
|
||||
PollInterval time.Duration `json:"poll_interval"`
|
||||
|
||||
// MaxConcurrent is the maximum number of MRs to process concurrently.
|
||||
MaxConcurrent int `json:"max_concurrent"`
|
||||
}
|
||||
|
||||
// DefaultMergeQueueConfig returns sensible defaults for merge queue configuration.
|
||||
func DefaultMergeQueueConfig() *MergeQueueConfig {
|
||||
return &MergeQueueConfig{
|
||||
Enabled: true,
|
||||
TargetBranch: "main",
|
||||
IntegrationBranches: true,
|
||||
OnConflict: "assign_back",
|
||||
RunTests: true,
|
||||
TestCommand: "",
|
||||
DeleteMergedBranches: true,
|
||||
RetryFlakyTests: 1,
|
||||
PollInterval: 30 * time.Second,
|
||||
MaxConcurrent: 1,
|
||||
}
|
||||
}
|
||||
|
||||
// Engineer is the merge queue processor that polls for ready merge-requests
|
||||
// and processes them according to the merge queue design.
|
||||
type Engineer struct {
|
||||
rig *rig.Rig
|
||||
beads *beads.Beads
|
||||
config *MergeQueueConfig
|
||||
workDir string
|
||||
|
||||
// stopCh is used for graceful shutdown
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewEngineer creates a new Engineer for the given rig.
|
||||
func NewEngineer(r *rig.Rig) *Engineer {
|
||||
return &Engineer{
|
||||
rig: r,
|
||||
beads: beads.New(r.Path),
|
||||
config: DefaultMergeQueueConfig(),
|
||||
workDir: r.Path,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// LoadConfig loads merge queue configuration from the rig's config.json.
|
||||
func (e *Engineer) LoadConfig() error {
|
||||
configPath := filepath.Join(e.rig.Path, "config.json")
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
// Use defaults if no config file
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("reading config: %w", err)
|
||||
}
|
||||
|
||||
// Parse config file to extract merge_queue section
|
||||
var rawConfig struct {
|
||||
MergeQueue json.RawMessage `json:"merge_queue"`
|
||||
}
|
||||
if err := json.Unmarshal(data, &rawConfig); err != nil {
|
||||
return fmt.Errorf("parsing config: %w", err)
|
||||
}
|
||||
|
||||
if rawConfig.MergeQueue == nil {
|
||||
// No merge_queue section, use defaults
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse merge_queue section into our config struct
|
||||
// We need special handling for poll_interval (string -> Duration)
|
||||
var mqRaw struct {
|
||||
Enabled *bool `json:"enabled"`
|
||||
TargetBranch *string `json:"target_branch"`
|
||||
IntegrationBranches *bool `json:"integration_branches"`
|
||||
OnConflict *string `json:"on_conflict"`
|
||||
RunTests *bool `json:"run_tests"`
|
||||
TestCommand *string `json:"test_command"`
|
||||
DeleteMergedBranches *bool `json:"delete_merged_branches"`
|
||||
RetryFlakyTests *int `json:"retry_flaky_tests"`
|
||||
PollInterval *string `json:"poll_interval"`
|
||||
MaxConcurrent *int `json:"max_concurrent"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(rawConfig.MergeQueue, &mqRaw); err != nil {
|
||||
return fmt.Errorf("parsing merge_queue config: %w", err)
|
||||
}
|
||||
|
||||
// Apply non-nil values to config (preserving defaults for missing fields)
|
||||
if mqRaw.Enabled != nil {
|
||||
e.config.Enabled = *mqRaw.Enabled
|
||||
}
|
||||
if mqRaw.TargetBranch != nil {
|
||||
e.config.TargetBranch = *mqRaw.TargetBranch
|
||||
}
|
||||
if mqRaw.IntegrationBranches != nil {
|
||||
e.config.IntegrationBranches = *mqRaw.IntegrationBranches
|
||||
}
|
||||
if mqRaw.OnConflict != nil {
|
||||
e.config.OnConflict = *mqRaw.OnConflict
|
||||
}
|
||||
if mqRaw.RunTests != nil {
|
||||
e.config.RunTests = *mqRaw.RunTests
|
||||
}
|
||||
if mqRaw.TestCommand != nil {
|
||||
e.config.TestCommand = *mqRaw.TestCommand
|
||||
}
|
||||
if mqRaw.DeleteMergedBranches != nil {
|
||||
e.config.DeleteMergedBranches = *mqRaw.DeleteMergedBranches
|
||||
}
|
||||
if mqRaw.RetryFlakyTests != nil {
|
||||
e.config.RetryFlakyTests = *mqRaw.RetryFlakyTests
|
||||
}
|
||||
if mqRaw.MaxConcurrent != nil {
|
||||
e.config.MaxConcurrent = *mqRaw.MaxConcurrent
|
||||
}
|
||||
if mqRaw.PollInterval != nil {
|
||||
dur, err := time.ParseDuration(*mqRaw.PollInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid poll_interval %q: %w", *mqRaw.PollInterval, err)
|
||||
}
|
||||
e.config.PollInterval = dur
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Config returns the current merge queue configuration.
|
||||
func (e *Engineer) Config() *MergeQueueConfig {
|
||||
return e.config
|
||||
}
|
||||
|
||||
// Run starts the Engineer main loop. It blocks until the context is cancelled
|
||||
// or Stop() is called. Returns nil on graceful shutdown.
|
||||
func (e *Engineer) Run(ctx context.Context) error {
|
||||
if err := e.LoadConfig(); err != nil {
|
||||
return fmt.Errorf("loading config: %w", err)
|
||||
}
|
||||
|
||||
if !e.config.Enabled {
|
||||
return fmt.Errorf("merge queue is disabled in configuration")
|
||||
}
|
||||
|
||||
fmt.Printf("[Engineer] Starting for rig %s (poll_interval=%s)\n",
|
||||
e.rig.Name, e.config.PollInterval)
|
||||
|
||||
ticker := time.NewTicker(e.config.PollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run one iteration immediately, then on ticker
|
||||
if err := e.processOnce(ctx); err != nil {
|
||||
fmt.Printf("[Engineer] Error: %v\n", err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("[Engineer] Shutting down (context cancelled)")
|
||||
return nil
|
||||
case <-e.stopCh:
|
||||
fmt.Println("[Engineer] Shutting down (stop signal)")
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := e.processOnce(ctx); err != nil {
|
||||
fmt.Printf("[Engineer] Error: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop signals the Engineer to stop processing. This is a non-blocking call.
|
||||
func (e *Engineer) Stop() {
|
||||
close(e.stopCh)
|
||||
}
|
||||
|
||||
// processOnce performs one iteration of the Engineer loop:
|
||||
// 1. Query for ready merge-requests
|
||||
// 2. If none, return (will try again on next tick)
|
||||
// 3. Process the highest priority, oldest MR
|
||||
func (e *Engineer) processOnce(ctx context.Context) error {
|
||||
// Check context before starting
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// 1. Query: bd ready --type=merge-request (filtered client-side)
|
||||
readyMRs, err := e.beads.ReadyWithType("merge-request")
|
||||
if err != nil {
|
||||
return fmt.Errorf("querying ready merge-requests: %w", err)
|
||||
}
|
||||
|
||||
// 2. If empty, return
|
||||
if len(readyMRs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 3. Select highest priority, oldest MR
|
||||
// bd ready already returns sorted by priority then age, so first is best
|
||||
mr := readyMRs[0]
|
||||
|
||||
fmt.Printf("[Engineer] Processing: %s (%s)\n", mr.ID, mr.Title)
|
||||
|
||||
// 4. Claim: bd update <id> --status=in_progress
|
||||
inProgress := "in_progress"
|
||||
if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &inProgress}); err != nil {
|
||||
return fmt.Errorf("claiming MR %s: %w", mr.ID, err)
|
||||
}
|
||||
|
||||
// 5. Process (delegate to ProcessMR - implementation in separate issue gt-3x1.2)
|
||||
result := e.ProcessMR(ctx, mr)
|
||||
|
||||
// 6. Handle result
|
||||
if result.Success {
|
||||
// Close with merged reason
|
||||
reason := fmt.Sprintf("merged: %s", result.MergeCommit)
|
||||
if err := e.beads.CloseWithReason(reason, mr.ID); err != nil {
|
||||
fmt.Printf("[Engineer] Warning: failed to close MR %s: %v\n", mr.ID, err)
|
||||
}
|
||||
fmt.Printf("[Engineer] ✓ Merged: %s\n", mr.ID)
|
||||
} else {
|
||||
// Failure handling (detailed implementation in gt-3x1.4)
|
||||
e.handleFailure(mr, result)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProcessResult contains the result of processing a merge request.
|
||||
type ProcessResult struct {
|
||||
Success bool
|
||||
MergeCommit string
|
||||
Error string
|
||||
Conflict bool
|
||||
TestsFailed bool
|
||||
}
|
||||
|
||||
// ProcessMR processes a single merge request.
|
||||
// This is a placeholder that will be fully implemented in gt-3x1.2.
|
||||
func (e *Engineer) ProcessMR(ctx context.Context, mr *beads.Issue) ProcessResult {
|
||||
// Parse MR fields from description
|
||||
mrFields := beads.ParseMRFields(mr)
|
||||
if mrFields == nil {
|
||||
return ProcessResult{
|
||||
Success: false,
|
||||
Error: "no MR fields found in description",
|
||||
}
|
||||
}
|
||||
|
||||
// For now, just log what we would do
|
||||
// Full implementation in gt-3x1.2: Fetch and conflict check
|
||||
fmt.Printf("[Engineer] Would process:\n")
|
||||
fmt.Printf(" Branch: %s\n", mrFields.Branch)
|
||||
fmt.Printf(" Target: %s\n", mrFields.Target)
|
||||
fmt.Printf(" Worker: %s\n", mrFields.Worker)
|
||||
|
||||
// Return failure for now - actual implementation in gt-3x1.2
|
||||
return ProcessResult{
|
||||
Success: false,
|
||||
Error: "ProcessMR not fully implemented (see gt-3x1.2)",
|
||||
}
|
||||
}
|
||||
|
||||
// handleFailure handles a failed merge request.
|
||||
// This is a placeholder that will be fully implemented in gt-3x1.4.
|
||||
func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) {
|
||||
// Reopen the MR (back to open status for rework)
|
||||
open := "open"
|
||||
if err := e.beads.Update(mr.ID, beads.UpdateOptions{Status: &open}); err != nil {
|
||||
fmt.Printf("[Engineer] Warning: failed to reopen MR %s: %v\n", mr.ID, err)
|
||||
}
|
||||
|
||||
// Log the failure
|
||||
fmt.Printf("[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error)
|
||||
|
||||
// Full failure handling (assign back to worker, labels) in gt-3x1.4
|
||||
}
|
||||
209
internal/refinery/engineer_test.go
Normal file
209
internal/refinery/engineer_test.go
Normal file
@@ -0,0 +1,209 @@
|
||||
package refinery
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/gastown/internal/rig"
|
||||
)
|
||||
|
||||
func TestDefaultMergeQueueConfig(t *testing.T) {
|
||||
cfg := DefaultMergeQueueConfig()
|
||||
|
||||
if !cfg.Enabled {
|
||||
t.Error("expected Enabled to be true by default")
|
||||
}
|
||||
if cfg.TargetBranch != "main" {
|
||||
t.Errorf("expected TargetBranch to be 'main', got %q", cfg.TargetBranch)
|
||||
}
|
||||
if cfg.PollInterval != 30*time.Second {
|
||||
t.Errorf("expected PollInterval to be 30s, got %v", cfg.PollInterval)
|
||||
}
|
||||
if cfg.MaxConcurrent != 1 {
|
||||
t.Errorf("expected MaxConcurrent to be 1, got %d", cfg.MaxConcurrent)
|
||||
}
|
||||
if cfg.OnConflict != "assign_back" {
|
||||
t.Errorf("expected OnConflict to be 'assign_back', got %q", cfg.OnConflict)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngineer_LoadConfig_NoFile(t *testing.T) {
|
||||
// Create a temp directory without config.json
|
||||
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
r := &rig.Rig{
|
||||
Name: "test-rig",
|
||||
Path: tmpDir,
|
||||
}
|
||||
|
||||
e := NewEngineer(r)
|
||||
|
||||
// Should not error with missing config file
|
||||
if err := e.LoadConfig(); err != nil {
|
||||
t.Errorf("unexpected error with missing config: %v", err)
|
||||
}
|
||||
|
||||
// Should use defaults
|
||||
if e.config.PollInterval != 30*time.Second {
|
||||
t.Errorf("expected default PollInterval, got %v", e.config.PollInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngineer_LoadConfig_WithMergeQueue(t *testing.T) {
|
||||
// Create a temp directory with config.json
|
||||
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Write config file
|
||||
config := map[string]interface{}{
|
||||
"type": "rig",
|
||||
"version": 1,
|
||||
"name": "test-rig",
|
||||
"merge_queue": map[string]interface{}{
|
||||
"enabled": true,
|
||||
"target_branch": "develop",
|
||||
"poll_interval": "10s",
|
||||
"max_concurrent": 2,
|
||||
"run_tests": false,
|
||||
"test_command": "make test",
|
||||
},
|
||||
}
|
||||
|
||||
data, _ := json.MarshalIndent(config, "", " ")
|
||||
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := &rig.Rig{
|
||||
Name: "test-rig",
|
||||
Path: tmpDir,
|
||||
}
|
||||
|
||||
e := NewEngineer(r)
|
||||
|
||||
if err := e.LoadConfig(); err != nil {
|
||||
t.Errorf("unexpected error loading config: %v", err)
|
||||
}
|
||||
|
||||
// Check that config values were loaded
|
||||
if e.config.TargetBranch != "develop" {
|
||||
t.Errorf("expected TargetBranch 'develop', got %q", e.config.TargetBranch)
|
||||
}
|
||||
if e.config.PollInterval != 10*time.Second {
|
||||
t.Errorf("expected PollInterval 10s, got %v", e.config.PollInterval)
|
||||
}
|
||||
if e.config.MaxConcurrent != 2 {
|
||||
t.Errorf("expected MaxConcurrent 2, got %d", e.config.MaxConcurrent)
|
||||
}
|
||||
if e.config.RunTests != false {
|
||||
t.Errorf("expected RunTests false, got %v", e.config.RunTests)
|
||||
}
|
||||
if e.config.TestCommand != "make test" {
|
||||
t.Errorf("expected TestCommand 'make test', got %q", e.config.TestCommand)
|
||||
}
|
||||
|
||||
// Check that defaults are preserved for unspecified fields
|
||||
if e.config.OnConflict != "assign_back" {
|
||||
t.Errorf("expected OnConflict default 'assign_back', got %q", e.config.OnConflict)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngineer_LoadConfig_NoMergeQueueSection(t *testing.T) {
|
||||
// Create a temp directory with config.json without merge_queue
|
||||
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
// Write config file without merge_queue
|
||||
config := map[string]interface{}{
|
||||
"type": "rig",
|
||||
"version": 1,
|
||||
"name": "test-rig",
|
||||
}
|
||||
|
||||
data, _ := json.MarshalIndent(config, "", " ")
|
||||
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := &rig.Rig{
|
||||
Name: "test-rig",
|
||||
Path: tmpDir,
|
||||
}
|
||||
|
||||
e := NewEngineer(r)
|
||||
|
||||
if err := e.LoadConfig(); err != nil {
|
||||
t.Errorf("unexpected error loading config: %v", err)
|
||||
}
|
||||
|
||||
// Should use all defaults
|
||||
if e.config.PollInterval != 30*time.Second {
|
||||
t.Errorf("expected default PollInterval, got %v", e.config.PollInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngineer_LoadConfig_InvalidPollInterval(t *testing.T) {
|
||||
tmpDir, err := os.MkdirTemp("", "engineer-test-*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
config := map[string]interface{}{
|
||||
"merge_queue": map[string]interface{}{
|
||||
"poll_interval": "not-a-duration",
|
||||
},
|
||||
}
|
||||
|
||||
data, _ := json.MarshalIndent(config, "", " ")
|
||||
if err := os.WriteFile(filepath.Join(tmpDir, "config.json"), data, 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := &rig.Rig{
|
||||
Name: "test-rig",
|
||||
Path: tmpDir,
|
||||
}
|
||||
|
||||
e := NewEngineer(r)
|
||||
|
||||
err = e.LoadConfig()
|
||||
if err == nil {
|
||||
t.Error("expected error for invalid poll_interval")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewEngineer(t *testing.T) {
|
||||
r := &rig.Rig{
|
||||
Name: "test-rig",
|
||||
Path: "/tmp/test-rig",
|
||||
}
|
||||
|
||||
e := NewEngineer(r)
|
||||
|
||||
if e.rig != r {
|
||||
t.Error("expected rig to be set")
|
||||
}
|
||||
if e.beads == nil {
|
||||
t.Error("expected beads client to be initialized")
|
||||
}
|
||||
if e.config == nil {
|
||||
t.Error("expected config to be initialized with defaults")
|
||||
}
|
||||
if e.stopCh == nil {
|
||||
t.Error("expected stopCh to be initialized")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user