feat(refinery): implement merge failure handling with labels and notifications
Add comprehensive failure handling for merge queue: - Add FailureType enum with conflict, tests_fail, build_fail, flaky_test, push_fail - Add handleFailure function that updates beads issue with labels and assignee - Add notifyWorkerFailure for type-specific failure notifications - Add retry logic for flaky tests (configurable via retry_flaky_tests) - Add pushWithRetry with exponential backoff for transient push failures - Add label support to beads UpdateOptions (AddLabels, RemoveLabels, SetLabels) Failure actions by type: - conflict: needs-rebase label, assign to worker - tests_fail/build_fail: needs-fix label, assign to worker - flaky_test: retry once, then treat as tests_fail - push_fail: retry with backoff, needs-retry label Closes gt-3x1.4 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
@@ -12,6 +13,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/gastown/internal/beads"
|
||||
"github.com/steveyegge/gastown/internal/mail"
|
||||
"github.com/steveyegge/gastown/internal/rig"
|
||||
)
|
||||
@@ -294,14 +296,19 @@ func (m *Manager) ProcessQueue() error {
|
||||
|
||||
// MergeResult contains the result of a merge attempt.
|
||||
type MergeResult struct {
|
||||
Success bool
|
||||
Error string
|
||||
Conflict bool
|
||||
TestsFailed bool
|
||||
Success bool
|
||||
Error string
|
||||
FailureType FailureType
|
||||
RetryCount int // Number of retries attempted
|
||||
}
|
||||
|
||||
// ProcessMR processes a single merge request.
|
||||
func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
|
||||
return m.processMRWithRetry(mr, 0)
|
||||
}
|
||||
|
||||
// processMRWithRetry processes a merge request with retry support.
|
||||
func (m *Manager) processMRWithRetry(mr *MergeRequest, retryCount int) MergeResult {
|
||||
ref, _ := m.loadState()
|
||||
|
||||
// Claim the MR (open → in_progress)
|
||||
@@ -311,12 +318,13 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
|
||||
ref.CurrentMR = mr
|
||||
m.saveState(ref)
|
||||
|
||||
result := MergeResult{}
|
||||
result := MergeResult{RetryCount: retryCount}
|
||||
|
||||
// 1. Fetch the branch
|
||||
if err := m.gitRun("fetch", "origin", mr.Branch); err != nil {
|
||||
result.Error = fmt.Sprintf("fetch failed: %v", err)
|
||||
m.completeMR(mr, "", result.Error) // Reopen for retry
|
||||
result.FailureType = FailureFetch
|
||||
m.handleFailure(mr, result)
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -324,7 +332,8 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
|
||||
// First, checkout target
|
||||
if err := m.gitRun("checkout", mr.TargetBranch); err != nil {
|
||||
result.Error = fmt.Sprintf("checkout target failed: %v", err)
|
||||
m.completeMR(mr, "", result.Error) // Reopen for retry
|
||||
result.FailureType = FailureCheckout
|
||||
m.handleFailure(mr, result)
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -339,17 +348,15 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
|
||||
if err != nil {
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "CONFLICT") || strings.Contains(errStr, "conflict") {
|
||||
result.Conflict = true
|
||||
result.FailureType = FailureConflict
|
||||
result.Error = "merge conflict"
|
||||
// Abort the merge
|
||||
m.gitRun("merge", "--abort")
|
||||
m.completeMR(mr, "", "merge conflict - polecat must rebase") // Reopen for rebase
|
||||
// Notify worker about conflict
|
||||
m.notifyWorkerConflict(mr)
|
||||
m.handleFailure(mr, result)
|
||||
return result
|
||||
}
|
||||
result.Error = fmt.Sprintf("merge failed: %v", err)
|
||||
m.completeMR(mr, "", result.Error) // Reopen for retry
|
||||
m.handleFailure(mr, result)
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -357,21 +364,33 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
|
||||
testCmd := m.getTestCommand()
|
||||
if testCmd != "" {
|
||||
if err := m.runTests(testCmd); err != nil {
|
||||
result.TestsFailed = true
|
||||
result.Error = fmt.Sprintf("tests failed: %v", err)
|
||||
// Reset to before merge
|
||||
m.gitRun("reset", "--hard", "HEAD~1")
|
||||
m.completeMR(mr, "", result.Error) // Reopen for fixes
|
||||
|
||||
// Check if this might be a flaky test (retry once)
|
||||
retryFlakyTests := m.getRetryFlakyTests()
|
||||
if retryCount < retryFlakyTests {
|
||||
log.Printf("[MQ] Test failure on attempt %d, retrying (may be flaky)...", retryCount+1)
|
||||
// Reopen the MR for retry
|
||||
mr.Reopen()
|
||||
return m.processMRWithRetry(mr, retryCount+1)
|
||||
}
|
||||
|
||||
result.FailureType = FailureTestsFail
|
||||
result.Error = fmt.Sprintf("tests failed: %v", err)
|
||||
m.handleFailure(mr, result)
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Push
|
||||
if err := m.gitRun("push", "origin", mr.TargetBranch); err != nil {
|
||||
result.Error = fmt.Sprintf("push failed: %v", err)
|
||||
// 4. Push with retry for transient failures
|
||||
pushErr := m.pushWithRetry(mr.TargetBranch, 3)
|
||||
if pushErr != nil {
|
||||
result.Error = fmt.Sprintf("push failed: %v", pushErr)
|
||||
result.FailureType = FailurePushFail
|
||||
// Reset to before merge
|
||||
m.gitRun("reset", "--hard", "HEAD~1")
|
||||
m.completeMR(mr, "", result.Error) // Reopen for retry
|
||||
m.handleFailure(mr, result)
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -388,6 +407,199 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
|
||||
return result
|
||||
}
|
||||
|
||||
// pushWithRetry attempts to push with exponential backoff.
|
||||
func (m *Manager) pushWithRetry(branch string, maxRetries int) error {
|
||||
var lastErr error
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
if i > 0 {
|
||||
// Exponential backoff: 1s, 2s, 4s...
|
||||
backoff := time.Duration(1<<uint(i-1)) * time.Second
|
||||
log.Printf("[MQ] Push attempt %d failed, retrying in %v...", i, backoff)
|
||||
time.Sleep(backoff)
|
||||
}
|
||||
|
||||
lastErr = m.gitRun("push", "origin", branch)
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// handleFailure handles merge failures by updating the source issue and notifying the worker.
|
||||
func (m *Manager) handleFailure(mr *MergeRequest, result MergeResult) {
|
||||
// Log the failure details
|
||||
log.Printf("[MQ] Merge failed for %s: type=%s, error=%s, retries=%d",
|
||||
mr.Branch, result.FailureType, result.Error, result.RetryCount)
|
||||
|
||||
// Reopen the MR (in_progress → open)
|
||||
m.completeMR(mr, "", result.Error)
|
||||
|
||||
// Update the source issue in beads if we have an issue ID
|
||||
if mr.IssueID != "" && result.FailureType.ShouldAssignToWorker() {
|
||||
m.updateSourceIssue(mr, result)
|
||||
}
|
||||
|
||||
// Send notification to worker
|
||||
m.notifyWorkerFailure(mr, result)
|
||||
}
|
||||
|
||||
// updateSourceIssue updates the beads issue with failure info.
|
||||
func (m *Manager) updateSourceIssue(mr *MergeRequest, result MergeResult) {
|
||||
// Find the beads directory - use the rig's canonical beads
|
||||
beadsDir := m.findBeadsDir()
|
||||
if beadsDir == "" {
|
||||
log.Printf("[MQ] Warning: could not find beads directory to update issue %s", mr.IssueID)
|
||||
return
|
||||
}
|
||||
|
||||
bd := beads.New(beadsDir)
|
||||
|
||||
// Build update options
|
||||
opts := beads.UpdateOptions{}
|
||||
|
||||
// Set status back to open
|
||||
status := "open"
|
||||
opts.Status = &status
|
||||
|
||||
// Assign back to worker
|
||||
workerAddr := fmt.Sprintf("%s/%s", m.rig.Name, mr.Worker)
|
||||
opts.Assignee = &workerAddr
|
||||
|
||||
// Add the failure label
|
||||
label := result.FailureType.FailureLabel()
|
||||
if label != "" {
|
||||
opts.AddLabels = []string{label}
|
||||
}
|
||||
|
||||
// Perform the update
|
||||
if err := bd.Update(mr.IssueID, opts); err != nil {
|
||||
log.Printf("[MQ] Warning: failed to update issue %s: %v", mr.IssueID, err)
|
||||
} else {
|
||||
log.Printf("[MQ] Updated issue %s: status=open, assignee=%s, label=%s",
|
||||
mr.IssueID, workerAddr, label)
|
||||
}
|
||||
}
|
||||
|
||||
// findBeadsDir locates the beads directory for this rig.
|
||||
func (m *Manager) findBeadsDir() string {
|
||||
// First try the rig's own .beads directory
|
||||
beadsPath := filepath.Join(m.rig.Path, ".beads")
|
||||
if _, err := os.Stat(beadsPath); err == nil {
|
||||
return m.rig.Path
|
||||
}
|
||||
|
||||
// Fall back to town root if we can find it
|
||||
townRoot := findTownRoot(m.rig.Path)
|
||||
if townRoot != "" {
|
||||
// Check if rig has a separate beads in polecats dir
|
||||
rigBeadsPath := filepath.Join(townRoot, m.rig.Name, ".beads")
|
||||
if _, err := os.Stat(rigBeadsPath); err == nil {
|
||||
return filepath.Join(townRoot, m.rig.Name)
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// notifyWorkerFailure sends a failure notification to the worker.
|
||||
func (m *Manager) notifyWorkerFailure(mr *MergeRequest, result MergeResult) {
|
||||
router := mail.NewRouter(m.workDir)
|
||||
|
||||
var subject, body string
|
||||
|
||||
switch result.FailureType {
|
||||
case FailureConflict:
|
||||
subject = "Merge conflict - rebase required"
|
||||
body = fmt.Sprintf(`Your branch %s has conflicts with %s.
|
||||
|
||||
Please rebase your changes:
|
||||
git fetch origin
|
||||
git rebase origin/%s
|
||||
git push -f
|
||||
|
||||
Then the Refinery will retry the merge.
|
||||
|
||||
Issue: %s
|
||||
Error: %s`,
|
||||
mr.Branch, mr.TargetBranch, mr.TargetBranch, mr.IssueID, result.Error)
|
||||
|
||||
case FailureTestsFail:
|
||||
subject = "Tests failed - fix required"
|
||||
body = fmt.Sprintf(`Your branch %s failed tests after merging with %s.
|
||||
|
||||
Please fix the failing tests and push your changes.
|
||||
|
||||
Issue: %s
|
||||
Error: %s
|
||||
Retries attempted: %d`,
|
||||
mr.Branch, mr.TargetBranch, mr.IssueID, result.Error, result.RetryCount)
|
||||
|
||||
case FailureBuildFail:
|
||||
subject = "Build failed - fix required"
|
||||
body = fmt.Sprintf(`Your branch %s failed to build after merging with %s.
|
||||
|
||||
Please fix the build errors and push your changes.
|
||||
|
||||
Issue: %s
|
||||
Error: %s`,
|
||||
mr.Branch, mr.TargetBranch, mr.IssueID, result.Error)
|
||||
|
||||
case FailurePushFail:
|
||||
subject = "Push failed - retrying"
|
||||
body = fmt.Sprintf(`Push to %s failed after merging your branch %s.
|
||||
|
||||
This may be a transient issue. The Refinery will retry.
|
||||
If this persists, please contact the team.
|
||||
|
||||
Issue: %s
|
||||
Error: %s`,
|
||||
mr.TargetBranch, mr.Branch, mr.IssueID, result.Error)
|
||||
|
||||
default:
|
||||
subject = "Merge failed"
|
||||
body = fmt.Sprintf(`Merge of branch %s to %s failed.
|
||||
|
||||
Issue: %s
|
||||
Error: %s
|
||||
|
||||
Please check the error and try again.`,
|
||||
mr.Branch, mr.TargetBranch, mr.IssueID, result.Error)
|
||||
}
|
||||
|
||||
msg := &mail.Message{
|
||||
From: fmt.Sprintf("%s/refinery", m.rig.Name),
|
||||
To: fmt.Sprintf("%s/%s", m.rig.Name, mr.Worker),
|
||||
Subject: subject,
|
||||
Body: body,
|
||||
Priority: mail.PriorityHigh,
|
||||
}
|
||||
router.Send(msg)
|
||||
}
|
||||
|
||||
// getRetryFlakyTests returns the number of retries for flaky tests from config.
|
||||
func (m *Manager) getRetryFlakyTests() int {
|
||||
configPath := filepath.Join(m.rig.Path, ".gastown", "config.json")
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return 1 // Default: retry once
|
||||
}
|
||||
|
||||
var config struct {
|
||||
MergeQueue struct {
|
||||
RetryFlakyTests int `json:"retry_flaky_tests"`
|
||||
} `json:"merge_queue"`
|
||||
}
|
||||
if err := json.Unmarshal(data, &config); err != nil {
|
||||
return 1
|
||||
}
|
||||
|
||||
if config.MergeQueue.RetryFlakyTests > 0 {
|
||||
return config.MergeQueue.RetryFlakyTests
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
// completeMR marks an MR as complete and updates stats.
|
||||
// For success, pass closeReason (e.g., CloseReasonMerged).
|
||||
// For failures that should return to open, pass empty closeReason.
|
||||
@@ -514,27 +726,6 @@ func formatAge(t time.Time) string {
|
||||
return fmt.Sprintf("%dd ago", int(d.Hours()/24))
|
||||
}
|
||||
|
||||
// notifyWorkerConflict sends a conflict notification to a polecat.
|
||||
func (m *Manager) notifyWorkerConflict(mr *MergeRequest) {
|
||||
router := mail.NewRouter(m.workDir)
|
||||
msg := &mail.Message{
|
||||
From: fmt.Sprintf("%s/refinery", m.rig.Name),
|
||||
To: fmt.Sprintf("%s/%s", m.rig.Name, mr.Worker),
|
||||
Subject: "Merge conflict - rebase required",
|
||||
Body: fmt.Sprintf(`Your branch %s has conflicts with %s.
|
||||
|
||||
Please rebase your changes:
|
||||
git fetch origin
|
||||
git rebase origin/%s
|
||||
git push -f
|
||||
|
||||
Then the Refinery will retry the merge.`,
|
||||
mr.Branch, mr.TargetBranch, mr.TargetBranch),
|
||||
Priority: mail.PriorityHigh,
|
||||
}
|
||||
router.Send(msg)
|
||||
}
|
||||
|
||||
// notifyWorkerMerged sends a success notification to a polecat.
|
||||
func (m *Manager) notifyWorkerMerged(mr *MergeRequest) {
|
||||
router := mail.NewRouter(m.workDir)
|
||||
|
||||
@@ -239,6 +239,59 @@ func (mr *MergeRequest) IsClosed() bool {
|
||||
return mr.Status == MRClosed
|
||||
}
|
||||
|
||||
// FailureType categorizes merge failures for appropriate handling.
|
||||
type FailureType string
|
||||
|
||||
const (
|
||||
// FailureNone indicates no failure (success).
|
||||
FailureNone FailureType = ""
|
||||
|
||||
// FailureConflict indicates merge conflicts with target branch.
|
||||
FailureConflict FailureType = "conflict"
|
||||
|
||||
// FailureTestsFail indicates tests failed after merge.
|
||||
FailureTestsFail FailureType = "tests_fail"
|
||||
|
||||
// FailureBuildFail indicates build failed after merge.
|
||||
FailureBuildFail FailureType = "build_fail"
|
||||
|
||||
// FailureFlakyTest indicates a potentially flaky test failure (may retry).
|
||||
FailureFlakyTest FailureType = "flaky_test"
|
||||
|
||||
// FailurePushFail indicates push to remote failed.
|
||||
FailurePushFail FailureType = "push_fail"
|
||||
|
||||
// FailureFetch indicates fetch of source branch failed.
|
||||
FailureFetch FailureType = "fetch_fail"
|
||||
|
||||
// FailureCheckout indicates checkout of target branch failed.
|
||||
FailureCheckout FailureType = "checkout_fail"
|
||||
)
|
||||
|
||||
// FailureLabel returns the beads label for this failure type.
|
||||
func (f FailureType) FailureLabel() string {
|
||||
switch f {
|
||||
case FailureConflict:
|
||||
return "needs-rebase"
|
||||
case FailureTestsFail, FailureBuildFail, FailureFlakyTest:
|
||||
return "needs-fix"
|
||||
case FailurePushFail:
|
||||
return "needs-retry"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
// ShouldAssignToWorker returns true if this failure should be assigned back to the worker.
|
||||
func (f FailureType) ShouldAssignToWorker() bool {
|
||||
switch f {
|
||||
case FailureConflict, FailureTestsFail, FailureBuildFail, FailureFlakyTest:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsOpen returns true if the MR is in an open state (waiting for processing).
|
||||
func (mr *MergeRequest) IsOpen() bool {
|
||||
return mr.Status == MROpen
|
||||
|
||||
@@ -255,3 +255,53 @@ func TestMergeRequest_StatusChecks(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFailureType_FailureLabel(t *testing.T) {
|
||||
tests := []struct {
|
||||
failureType FailureType
|
||||
wantLabel string
|
||||
}{
|
||||
{FailureNone, ""},
|
||||
{FailureConflict, "needs-rebase"},
|
||||
{FailureTestsFail, "needs-fix"},
|
||||
{FailureBuildFail, "needs-fix"},
|
||||
{FailureFlakyTest, "needs-fix"},
|
||||
{FailurePushFail, "needs-retry"},
|
||||
{FailureFetch, ""},
|
||||
{FailureCheckout, ""},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(string(tt.failureType), func(t *testing.T) {
|
||||
got := tt.failureType.FailureLabel()
|
||||
if got != tt.wantLabel {
|
||||
t.Errorf("FailureLabel() = %q, want %q", got, tt.wantLabel)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFailureType_ShouldAssignToWorker(t *testing.T) {
|
||||
tests := []struct {
|
||||
failureType FailureType
|
||||
wantAssign bool
|
||||
}{
|
||||
{FailureNone, false},
|
||||
{FailureConflict, true},
|
||||
{FailureTestsFail, true},
|
||||
{FailureBuildFail, true},
|
||||
{FailureFlakyTest, true},
|
||||
{FailurePushFail, false},
|
||||
{FailureFetch, false},
|
||||
{FailureCheckout, false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(string(tt.failureType), func(t *testing.T) {
|
||||
got := tt.failureType.ShouldAssignToWorker()
|
||||
if got != tt.wantAssign {
|
||||
t.Errorf("ShouldAssignToWorker() = %v, want %v", got, tt.wantAssign)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user