Refinery emits activity events for MQ lifecycle (gt-ytsxp)

Add merge queue activity events to the refinery:
- merge_started: When refinery begins processing an MR
- merged: When MR successfully merged to main
- merge_failed: When merge fails (conflict, tests, push, etc.)
- merge_skipped: When MR skipped (superseded)

Events include MR ID, worker, branch, and reason (for failures).
Implemented in both Manager.ProcessMR and Engineer.ProcessMRFromQueue.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-29 23:46:40 -08:00
parent e362be3c41
commit 65d4dbe222
3 changed files with 55 additions and 0 deletions

View File

@@ -45,6 +45,12 @@ const (
TypeNudge = "nudge"
TypeBoot = "boot"
TypeHalt = "halt"
// Merge queue events (emitted by refinery)
TypeMergeStarted = "merge_started"
TypeMerged = "merged"
TypeMergeFailed = "merge_failed"
TypeMergeSkipped = "merge_skipped"
)
// EventsFile is the name of the raw events log.
@@ -172,3 +178,20 @@ func BootPayload(rig string, agents []string) map[string]interface{} {
"agents": agents,
}
}
// MergePayload creates a payload for merge queue events.
// mrID: merge request ID
// worker: polecat name that submitted the work
// branch: source branch being merged
// reason: failure reason (for merge_failed/merge_skipped events)
func MergePayload(mrID, worker, branch, reason string) map[string]interface{} {
p := map[string]interface{}{
"mr": mrID,
"worker": worker,
"branch": branch,
}
if reason != "" {
p["reason"] = reason
}
return p
}

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/events"
"github.com/steveyegge/gastown/internal/git"
"github.com/steveyegge/gastown/internal/mrqueue"
"github.com/steveyegge/gastown/internal/rig"
@@ -374,6 +375,10 @@ func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) {
// ProcessMRFromQueue processes a merge request from wisp queue.
func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) ProcessResult {
// Emit merge_started event
actor := fmt.Sprintf("%s/refinery", e.rig.Name)
_ = events.LogFeed(events.TypeMergeStarted, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, ""))
// MR fields are directly on the struct (no parsing needed)
fmt.Fprintln(e.output, "[Engineer] Processing MR from queue:")
fmt.Fprintf(e.output, " Branch: %s\n", mr.Branch)
@@ -391,6 +396,11 @@ func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) Proce
// handleSuccessFromQueue handles a successful merge from wisp queue.
func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) {
actor := fmt.Sprintf("%s/refinery", e.rig.Name)
// Emit merged event
_ = events.LogFeed(events.TypeMerged, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, ""))
// 1. Close source issue with reference to MR
if mr.SourceIssue != "" {
closeReason := fmt.Sprintf("Merged in %s", mr.ID)
@@ -421,6 +431,11 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult)
// handleFailureFromQueue handles a failed merge from wisp queue.
func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) {
actor := fmt.Sprintf("%s/refinery", e.rig.Name)
// Emit merge_failed event
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error))
// MR stays in queue for retry - no action needed on the file
// Log the failure
fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error)

View File

@@ -15,6 +15,7 @@ import (
"github.com/steveyegge/gastown/internal/claude"
"github.com/steveyegge/gastown/internal/config"
"github.com/steveyegge/gastown/internal/events"
"github.com/steveyegge/gastown/internal/mail"
"github.com/steveyegge/gastown/internal/rig"
"github.com/steveyegge/gastown/internal/tmux"
@@ -394,11 +395,16 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
ref.CurrentMR = mr
_ = m.saveState(ref) // non-fatal: state file update
// Emit merge_started event
actor := fmt.Sprintf("%s/refinery", m.rig.Name)
_ = events.LogFeed(events.TypeMergeStarted, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, ""))
result := MergeResult{}
// 1. Fetch the branch
if err := m.gitRun("fetch", "origin", mr.Branch); err != nil {
result.Error = fmt.Sprintf("fetch failed: %v", err)
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error))
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
@@ -406,6 +412,7 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
// 2. Checkout target branch
if err := m.gitRun("checkout", mr.TargetBranch); err != nil {
result.Error = fmt.Sprintf("checkout target failed: %v", err)
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error))
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
@@ -425,12 +432,14 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
result.Error = "merge conflict"
// Abort the merge (best-effort cleanup)
_ = m.gitRun("merge", "--abort")
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "merge conflict"))
m.completeMR(mr, "", "merge conflict - polecat must rebase") // Reopen for rebase
// Notify worker about conflict
m.notifyWorkerConflict(mr)
return result
}
result.Error = fmt.Sprintf("merge failed: %v", err)
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error))
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
@@ -442,6 +451,7 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
result.Error = fmt.Sprintf("tests failed: %v", err)
// Reset to before merge (best-effort rollback)
_ = m.gitRun("reset", "--hard", "HEAD~1")
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error))
m.completeMR(mr, "", result.Error) // Reopen for fixes
return result
}
@@ -452,6 +462,7 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
result.Error = fmt.Sprintf("push failed: %v", err)
// Reset to before merge (best-effort rollback)
_ = m.gitRun("reset", "--hard", "HEAD~1")
_ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error))
m.completeMR(mr, "", result.Error) // Reopen for retry
return result
}
@@ -467,6 +478,9 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
result.MergeCommit = mergeCommit
m.completeMR(mr, CloseReasonMerged, "")
// Emit merged event
_ = events.LogFeed(events.TypeMerged, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, ""))
// Notify worker of success
m.notifyWorkerMerged(mr)
@@ -487,6 +501,7 @@ func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg s
ref.CurrentMR = nil
now := time.Now()
actor := fmt.Sprintf("%s/refinery", m.rig.Name)
if closeReason != "" {
// Close the MR (in_progress → closed)
@@ -501,6 +516,8 @@ func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg s
ref.Stats.TodayMerged++
case CloseReasonSuperseded:
ref.Stats.TotalSkipped++
// Emit merge_skipped event
_ = events.LogFeed(events.TypeMergeSkipped, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "superseded"))
default:
// Other close reasons (rejected, conflict) count as failed
ref.Stats.TotalFailed++