From 65d4dbe222bf355e7f084007d06be87d47d39caa Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Mon, 29 Dec 2025 23:46:40 -0800 Subject: [PATCH] Refinery emits activity events for MQ lifecycle (gt-ytsxp) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add merge queue activity events to the refinery: - merge_started: When refinery begins processing an MR - merged: When MR successfully merged to main - merge_failed: When merge fails (conflict, tests, push, etc.) - merge_skipped: When MR skipped (superseded) Events include MR ID, worker, branch, and reason (for failures). Implemented in both Manager.ProcessMR and Engineer.ProcessMRFromQueue. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/events/events.go | 23 +++++++++++++++++++++++ internal/refinery/engineer.go | 15 +++++++++++++++ internal/refinery/manager.go | 17 +++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/internal/events/events.go b/internal/events/events.go index e4ea3396..ac2543a8 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -45,6 +45,12 @@ const ( TypeNudge = "nudge" TypeBoot = "boot" TypeHalt = "halt" + + // Merge queue events (emitted by refinery) + TypeMergeStarted = "merge_started" + TypeMerged = "merged" + TypeMergeFailed = "merge_failed" + TypeMergeSkipped = "merge_skipped" ) // EventsFile is the name of the raw events log. @@ -172,3 +178,20 @@ func BootPayload(rig string, agents []string) map[string]interface{} { "agents": agents, } } + +// MergePayload creates a payload for merge queue events. +// mrID: merge request ID +// worker: polecat name that submitted the work +// branch: source branch being merged +// reason: failure reason (for merge_failed/merge_skipped events) +func MergePayload(mrID, worker, branch, reason string) map[string]interface{} { + p := map[string]interface{}{ + "mr": mrID, + "worker": worker, + "branch": branch, + } + if reason != "" { + p["reason"] = reason + } + return p +} diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index c2649fc4..021cdeb0 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -11,6 +11,7 @@ import ( "time" "github.com/steveyegge/gastown/internal/beads" + "github.com/steveyegge/gastown/internal/events" "github.com/steveyegge/gastown/internal/git" "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/rig" @@ -374,6 +375,10 @@ func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) { // ProcessMRFromQueue processes a merge request from wisp queue. func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) ProcessResult { + // Emit merge_started event + actor := fmt.Sprintf("%s/refinery", e.rig.Name) + _ = events.LogFeed(events.TypeMergeStarted, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "")) + // MR fields are directly on the struct (no parsing needed) fmt.Fprintln(e.output, "[Engineer] Processing MR from queue:") fmt.Fprintf(e.output, " Branch: %s\n", mr.Branch) @@ -391,6 +396,11 @@ func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) Proce // handleSuccessFromQueue handles a successful merge from wisp queue. func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) { + actor := fmt.Sprintf("%s/refinery", e.rig.Name) + + // Emit merged event + _ = events.LogFeed(events.TypeMerged, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "")) + // 1. Close source issue with reference to MR if mr.SourceIssue != "" { closeReason := fmt.Sprintf("Merged in %s", mr.ID) @@ -421,6 +431,11 @@ func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) // handleFailureFromQueue handles a failed merge from wisp queue. func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) { + actor := fmt.Sprintf("%s/refinery", e.rig.Name) + + // Emit merge_failed event + _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error)) + // MR stays in queue for retry - no action needed on the file // Log the failure fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error) diff --git a/internal/refinery/manager.go b/internal/refinery/manager.go index 0ee14225..cd82899e 100644 --- a/internal/refinery/manager.go +++ b/internal/refinery/manager.go @@ -15,6 +15,7 @@ import ( "github.com/steveyegge/gastown/internal/claude" "github.com/steveyegge/gastown/internal/config" + "github.com/steveyegge/gastown/internal/events" "github.com/steveyegge/gastown/internal/mail" "github.com/steveyegge/gastown/internal/rig" "github.com/steveyegge/gastown/internal/tmux" @@ -394,11 +395,16 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult { ref.CurrentMR = mr _ = m.saveState(ref) // non-fatal: state file update + // Emit merge_started event + actor := fmt.Sprintf("%s/refinery", m.rig.Name) + _ = events.LogFeed(events.TypeMergeStarted, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "")) + result := MergeResult{} // 1. Fetch the branch if err := m.gitRun("fetch", "origin", mr.Branch); err != nil { result.Error = fmt.Sprintf("fetch failed: %v", err) + _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error)) m.completeMR(mr, "", result.Error) // Reopen for retry return result } @@ -406,6 +412,7 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult { // 2. Checkout target branch if err := m.gitRun("checkout", mr.TargetBranch); err != nil { result.Error = fmt.Sprintf("checkout target failed: %v", err) + _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error)) m.completeMR(mr, "", result.Error) // Reopen for retry return result } @@ -425,12 +432,14 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult { result.Error = "merge conflict" // Abort the merge (best-effort cleanup) _ = m.gitRun("merge", "--abort") + _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "merge conflict")) m.completeMR(mr, "", "merge conflict - polecat must rebase") // Reopen for rebase // Notify worker about conflict m.notifyWorkerConflict(mr) return result } result.Error = fmt.Sprintf("merge failed: %v", err) + _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error)) m.completeMR(mr, "", result.Error) // Reopen for retry return result } @@ -442,6 +451,7 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult { result.Error = fmt.Sprintf("tests failed: %v", err) // Reset to before merge (best-effort rollback) _ = m.gitRun("reset", "--hard", "HEAD~1") + _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error)) m.completeMR(mr, "", result.Error) // Reopen for fixes return result } @@ -452,6 +462,7 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult { result.Error = fmt.Sprintf("push failed: %v", err) // Reset to before merge (best-effort rollback) _ = m.gitRun("reset", "--hard", "HEAD~1") + _ = events.LogFeed(events.TypeMergeFailed, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, result.Error)) m.completeMR(mr, "", result.Error) // Reopen for retry return result } @@ -467,6 +478,9 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult { result.MergeCommit = mergeCommit m.completeMR(mr, CloseReasonMerged, "") + // Emit merged event + _ = events.LogFeed(events.TypeMerged, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "")) + // Notify worker of success m.notifyWorkerMerged(mr) @@ -487,6 +501,7 @@ func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg s ref.CurrentMR = nil now := time.Now() + actor := fmt.Sprintf("%s/refinery", m.rig.Name) if closeReason != "" { // Close the MR (in_progress → closed) @@ -501,6 +516,8 @@ func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg s ref.Stats.TodayMerged++ case CloseReasonSuperseded: ref.Stats.TotalSkipped++ + // Emit merge_skipped event + _ = events.LogFeed(events.TypeMergeSkipped, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "superseded")) default: // Other close reasons (rejected, conflict) count as failed ref.Stats.TotalFailed++