diff --git a/docs/mail-protocol.md b/docs/mail-protocol.md index 44befd87..b9715295 100644 --- a/docs/mail-protocol.md +++ b/docs/mail-protocol.md @@ -61,13 +61,73 @@ Verified: clean git state, issue closed ``` Branch: Issue: +Polecat: +Rig: +Target: Merged-At: +Merge-Commit: ``` **Trigger**: Refinery sends after successful merge to main. **Handler**: Witness completes cleanup wisp, nukes polecat worktree. +### MERGE_FAILED + +**Route**: Refinery → Witness + +**Purpose**: Notify that merge attempt failed (tests, build, or other non-conflict error). + +**Subject format**: `MERGE_FAILED ` + +**Body format**: +``` +Branch: +Issue: +Polecat: +Rig: +Target: +Failed-At: +Failure-Type: +Error: +``` + +**Trigger**: Refinery sends when merge fails for non-conflict reasons. + +**Handler**: Witness notifies polecat, assigns work back for rework. + +### REWORK_REQUEST + +**Route**: Refinery → Witness + +**Purpose**: Request polecat to rebase branch due to merge conflicts. + +**Subject format**: `REWORK_REQUEST ` + +**Body format**: +``` +Branch: +Issue: +Polecat: +Rig: +Target: +Requested-At: +Conflict-Files: , , ... + +Please rebase your changes onto : + + git fetch origin + git rebase origin/ + # Resolve any conflicts + git push -f + +The Refinery will retry the merge after rebase is complete. +``` + +**Trigger**: Refinery sends when merge has conflicts with target branch. + +**Handler**: Witness notifies polecat with rebase instructions. + ### WITNESS_PING **Route**: Witness → Deacon (all witnesses send) @@ -184,15 +244,52 @@ Polecat Witness Refinery │ │ MERGE_READY │ │ │─────────────────────────>│ │ │ │ - │ │ (merge to main) + │ │ (merge attempt) │ │ │ - │ │ MERGED │ + │ │ MERGED (success) │ │ │<─────────────────────────│ │ │ │ │ (nuke polecat) │ │ │ │ ``` +### Merge Failure Flow + +``` + Witness Refinery + │ │ + │ (merge fails) + │ │ + │ MERGE_FAILED │ + ┌──────────────────────────│<─────────────────────────│ + │ │ │ + │ (failure notification) │ │ + │<─────────────────────────│ │ + │ │ │ +Polecat (rework needed) +``` + +### Rebase Required Flow + +``` + Witness Refinery + │ │ + │ (conflict detected) + │ │ + │ REWORK_REQUEST │ + ┌──────────────────────────│<─────────────────────────│ + │ │ │ + │ (rebase instructions) │ │ + │<─────────────────────────│ │ + │ │ │ +Polecat │ │ + │ │ │ + │ (rebases, gt done) │ │ + │─────────────────────────>│ MERGE_READY │ + │ │─────────────────────────>│ + │ │ (retry merge) +``` + ### Second-Order Monitoring ``` @@ -261,3 +358,4 @@ flexible enough for human debugging. - `docs/agent-as-bead.md` - Agent identity and slots - `.beads/formulas/mol-witness-patrol.formula.toml` - Witness handling - `internal/mail/` - Mail routing implementation +- `internal/protocol/` - Protocol handlers for Witness-Refinery communication diff --git a/internal/protocol/handlers.go b/internal/protocol/handlers.go new file mode 100644 index 00000000..c7ef1805 --- /dev/null +++ b/internal/protocol/handlers.go @@ -0,0 +1,124 @@ +package protocol + +import ( + "fmt" + + "github.com/steveyegge/gastown/internal/mail" +) + +// Handler processes a protocol message and returns an error if processing failed. +type Handler func(msg *mail.Message) error + +// HandlerRegistry maps message types to their handlers. +type HandlerRegistry struct { + handlers map[MessageType]Handler +} + +// NewHandlerRegistry creates a new handler registry. +func NewHandlerRegistry() *HandlerRegistry { + return &HandlerRegistry{ + handlers: make(map[MessageType]Handler), + } +} + +// Register adds a handler for a specific message type. +func (r *HandlerRegistry) Register(msgType MessageType, handler Handler) { + r.handlers[msgType] = handler +} + +// Handle dispatches a message to the appropriate handler. +// Returns an error if no handler is registered for the message type. +func (r *HandlerRegistry) Handle(msg *mail.Message) error { + msgType := ParseMessageType(msg.Subject) + if msgType == "" { + return fmt.Errorf("unknown message type for subject: %s", msg.Subject) + } + + handler, ok := r.handlers[msgType] + if !ok { + return fmt.Errorf("no handler registered for message type: %s", msgType) + } + + return handler(msg) +} + +// CanHandle returns true if a handler is registered for the message's type. +func (r *HandlerRegistry) CanHandle(msg *mail.Message) bool { + msgType := ParseMessageType(msg.Subject) + if msgType == "" { + return false + } + + _, ok := r.handlers[msgType] + return ok +} + +// WitnessHandler defines the interface for Witness protocol handlers. +// The Witness receives messages from Refinery about merge status. +type WitnessHandler interface { + // HandleMerged is called when a branch was successfully merged. + HandleMerged(payload *MergedPayload) error + + // HandleMergeFailed is called when a merge attempt failed. + HandleMergeFailed(payload *MergeFailedPayload) error + + // HandleReworkRequest is called when a branch needs rebasing. + HandleReworkRequest(payload *ReworkRequestPayload) error +} + +// RefineryHandler defines the interface for Refinery protocol handlers. +// The Refinery receives messages from Witness about ready branches. +type RefineryHandler interface { + // HandleMergeReady is called when a polecat's work is verified and ready. + HandleMergeReady(payload *MergeReadyPayload) error +} + +// WrapWitnessHandlers creates mail handlers from a WitnessHandler. +func WrapWitnessHandlers(h WitnessHandler) *HandlerRegistry { + registry := NewHandlerRegistry() + + registry.Register(TypeMerged, func(msg *mail.Message) error { + payload := ParseMergedPayload(msg.Body) + return h.HandleMerged(payload) + }) + + registry.Register(TypeMergeFailed, func(msg *mail.Message) error { + payload := ParseMergeFailedPayload(msg.Body) + return h.HandleMergeFailed(payload) + }) + + registry.Register(TypeReworkRequest, func(msg *mail.Message) error { + payload := ParseReworkRequestPayload(msg.Body) + return h.HandleReworkRequest(payload) + }) + + return registry +} + +// WrapRefineryHandlers creates mail handlers from a RefineryHandler. +func WrapRefineryHandlers(h RefineryHandler) *HandlerRegistry { + registry := NewHandlerRegistry() + + registry.Register(TypeMergeReady, func(msg *mail.Message) error { + payload := ParseMergeReadyPayload(msg.Body) + return h.HandleMergeReady(payload) + }) + + return registry +} + +// ProcessProtocolMessage processes a protocol message using the registry. +// It returns (true, nil) if the message was handled successfully, +// (true, error) if handling failed, or (false, nil) if not a protocol message. +func (r *HandlerRegistry) ProcessProtocolMessage(msg *mail.Message) (bool, error) { + if !IsProtocolMessage(msg.Subject) { + return false, nil + } + + if !r.CanHandle(msg) { + return false, nil + } + + err := r.Handle(msg) + return true, err +} diff --git a/internal/protocol/messages.go b/internal/protocol/messages.go new file mode 100644 index 00000000..54d26801 --- /dev/null +++ b/internal/protocol/messages.go @@ -0,0 +1,288 @@ +package protocol + +import ( + "fmt" + "strings" + "time" + + "github.com/steveyegge/gastown/internal/mail" +) + +// NewMergeReadyMessage creates a MERGE_READY protocol message. +// Sent by Witness to Refinery when a polecat's work is verified and ready. +func NewMergeReadyMessage(rig, polecat, branch, issue string) *mail.Message { + payload := MergeReadyPayload{ + Branch: branch, + Issue: issue, + Polecat: polecat, + Rig: rig, + Verified: "clean git state, issue closed", + Timestamp: time.Now(), + } + + body := formatMergeReadyBody(payload) + + msg := mail.NewMessage( + fmt.Sprintf("%s/witness", rig), + fmt.Sprintf("%s/refinery", rig), + fmt.Sprintf("MERGE_READY %s", polecat), + body, + ) + msg.Priority = mail.PriorityHigh + msg.Type = mail.TypeTask + + return msg +} + +// formatMergeReadyBody formats the body of a MERGE_READY message. +func formatMergeReadyBody(p MergeReadyPayload) string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch)) + sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue)) + sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat)) + sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig)) + if p.Verified != "" { + sb.WriteString(fmt.Sprintf("Verified: %s\n", p.Verified)) + } + return sb.String() +} + +// NewMergedMessage creates a MERGED protocol message. +// Sent by Refinery to Witness when a branch is successfully merged. +func NewMergedMessage(rig, polecat, branch, issue, targetBranch, mergeCommit string) *mail.Message { + payload := MergedPayload{ + Branch: branch, + Issue: issue, + Polecat: polecat, + Rig: rig, + MergedAt: time.Now(), + MergeCommit: mergeCommit, + TargetBranch: targetBranch, + } + + body := formatMergedBody(payload) + + msg := mail.NewMessage( + fmt.Sprintf("%s/refinery", rig), + fmt.Sprintf("%s/witness", rig), + fmt.Sprintf("MERGED %s", polecat), + body, + ) + msg.Priority = mail.PriorityHigh + msg.Type = mail.TypeNotification + + return msg +} + +// formatMergedBody formats the body of a MERGED message. +func formatMergedBody(p MergedPayload) string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch)) + sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue)) + sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat)) + sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig)) + sb.WriteString(fmt.Sprintf("Target: %s\n", p.TargetBranch)) + sb.WriteString(fmt.Sprintf("Merged-At: %s\n", p.MergedAt.Format(time.RFC3339))) + if p.MergeCommit != "" { + sb.WriteString(fmt.Sprintf("Merge-Commit: %s\n", p.MergeCommit)) + } + return sb.String() +} + +// NewMergeFailedMessage creates a MERGE_FAILED protocol message. +// Sent by Refinery to Witness when merge fails (tests, build, etc.). +func NewMergeFailedMessage(rig, polecat, branch, issue, targetBranch, failureType, errorMsg string) *mail.Message { + payload := MergeFailedPayload{ + Branch: branch, + Issue: issue, + Polecat: polecat, + Rig: rig, + FailedAt: time.Now(), + FailureType: failureType, + Error: errorMsg, + TargetBranch: targetBranch, + } + + body := formatMergeFailedBody(payload) + + msg := mail.NewMessage( + fmt.Sprintf("%s/refinery", rig), + fmt.Sprintf("%s/witness", rig), + fmt.Sprintf("MERGE_FAILED %s", polecat), + body, + ) + msg.Priority = mail.PriorityHigh + msg.Type = mail.TypeTask + + return msg +} + +// formatMergeFailedBody formats the body of a MERGE_FAILED message. +func formatMergeFailedBody(p MergeFailedPayload) string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch)) + sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue)) + sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat)) + sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig)) + sb.WriteString(fmt.Sprintf("Target: %s\n", p.TargetBranch)) + sb.WriteString(fmt.Sprintf("Failed-At: %s\n", p.FailedAt.Format(time.RFC3339))) + sb.WriteString(fmt.Sprintf("Failure-Type: %s\n", p.FailureType)) + sb.WriteString(fmt.Sprintf("Error: %s\n", p.Error)) + return sb.String() +} + +// NewReworkRequestMessage creates a REWORK_REQUEST protocol message. +// Sent by Refinery to Witness when a branch needs rebasing due to conflicts. +func NewReworkRequestMessage(rig, polecat, branch, issue, targetBranch string, conflictFiles []string) *mail.Message { + payload := ReworkRequestPayload{ + Branch: branch, + Issue: issue, + Polecat: polecat, + Rig: rig, + RequestedAt: time.Now(), + TargetBranch: targetBranch, + ConflictFiles: conflictFiles, + Instructions: formatRebaseInstructions(targetBranch), + } + + body := formatReworkRequestBody(payload) + + msg := mail.NewMessage( + fmt.Sprintf("%s/refinery", rig), + fmt.Sprintf("%s/witness", rig), + fmt.Sprintf("REWORK_REQUEST %s", polecat), + body, + ) + msg.Priority = mail.PriorityHigh + msg.Type = mail.TypeTask + + return msg +} + +// formatReworkRequestBody formats the body of a REWORK_REQUEST message. +func formatReworkRequestBody(p ReworkRequestPayload) string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch)) + sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue)) + sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat)) + sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig)) + sb.WriteString(fmt.Sprintf("Target: %s\n", p.TargetBranch)) + sb.WriteString(fmt.Sprintf("Requested-At: %s\n", p.RequestedAt.Format(time.RFC3339))) + + if len(p.ConflictFiles) > 0 { + sb.WriteString(fmt.Sprintf("Conflict-Files: %s\n", strings.Join(p.ConflictFiles, ", "))) + } + + sb.WriteString("\n") + sb.WriteString(p.Instructions) + + return sb.String() +} + +// formatRebaseInstructions returns standard rebase instructions. +func formatRebaseInstructions(targetBranch string) string { + return fmt.Sprintf(`Please rebase your changes onto %s: + + git fetch origin + git rebase origin/%s + # Resolve any conflicts + git push -f + +The Refinery will retry the merge after rebase is complete.`, targetBranch, targetBranch) +} + +// ParseMergeReadyPayload parses a MERGE_READY message body into a payload. +func ParseMergeReadyPayload(body string) *MergeReadyPayload { + return &MergeReadyPayload{ + Branch: parseField(body, "Branch"), + Issue: parseField(body, "Issue"), + Polecat: parseField(body, "Polecat"), + Rig: parseField(body, "Rig"), + Verified: parseField(body, "Verified"), + Timestamp: time.Now(), // Use current time if not parseable + } +} + +// ParseMergedPayload parses a MERGED message body into a payload. +func ParseMergedPayload(body string) *MergedPayload { + payload := &MergedPayload{ + Branch: parseField(body, "Branch"), + Issue: parseField(body, "Issue"), + Polecat: parseField(body, "Polecat"), + Rig: parseField(body, "Rig"), + TargetBranch: parseField(body, "Target"), + MergeCommit: parseField(body, "Merge-Commit"), + } + + // Parse timestamp + if ts := parseField(body, "Merged-At"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + payload.MergedAt = t + } + } + + return payload +} + +// ParseMergeFailedPayload parses a MERGE_FAILED message body into a payload. +func ParseMergeFailedPayload(body string) *MergeFailedPayload { + payload := &MergeFailedPayload{ + Branch: parseField(body, "Branch"), + Issue: parseField(body, "Issue"), + Polecat: parseField(body, "Polecat"), + Rig: parseField(body, "Rig"), + TargetBranch: parseField(body, "Target"), + FailureType: parseField(body, "Failure-Type"), + Error: parseField(body, "Error"), + } + + // Parse timestamp + if ts := parseField(body, "Failed-At"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + payload.FailedAt = t + } + } + + return payload +} + +// ParseReworkRequestPayload parses a REWORK_REQUEST message body into a payload. +func ParseReworkRequestPayload(body string) *ReworkRequestPayload { + payload := &ReworkRequestPayload{ + Branch: parseField(body, "Branch"), + Issue: parseField(body, "Issue"), + Polecat: parseField(body, "Polecat"), + Rig: parseField(body, "Rig"), + TargetBranch: parseField(body, "Target"), + } + + // Parse timestamp + if ts := parseField(body, "Requested-At"); ts != "" { + if t, err := time.Parse(time.RFC3339, ts); err == nil { + payload.RequestedAt = t + } + } + + // Parse conflict files + if files := parseField(body, "Conflict-Files"); files != "" { + payload.ConflictFiles = strings.Split(files, ", ") + } + + return payload +} + +// parseField extracts a field value from a key-value body format. +// Format: "Key: value" +func parseField(body, key string) string { + lines := strings.Split(body, "\n") + prefix := key + ": " + + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, prefix) { + return strings.TrimPrefix(line, prefix) + } + } + + return "" +} diff --git a/internal/protocol/protocol_test.go b/internal/protocol/protocol_test.go new file mode 100644 index 00000000..4ec71e92 --- /dev/null +++ b/internal/protocol/protocol_test.go @@ -0,0 +1,378 @@ +package protocol + +import ( + "bytes" + "strings" + "testing" + "time" + + "github.com/steveyegge/gastown/internal/mail" +) + +func TestParseMessageType(t *testing.T) { + tests := []struct { + subject string + expected MessageType + }{ + {"MERGE_READY nux", TypeMergeReady}, + {"MERGED Toast", TypeMerged}, + {"MERGE_FAILED ace", TypeMergeFailed}, + {"REWORK_REQUEST valkyrie", TypeReworkRequest}, + {"MERGE_READY", TypeMergeReady}, // no polecat name + {"Unknown subject", ""}, + {"", ""}, + {" MERGE_READY nux ", TypeMergeReady}, // with whitespace + } + + for _, tt := range tests { + t.Run(tt.subject, func(t *testing.T) { + result := ParseMessageType(tt.subject) + if result != tt.expected { + t.Errorf("ParseMessageType(%q) = %q, want %q", tt.subject, result, tt.expected) + } + }) + } +} + +func TestExtractPolecat(t *testing.T) { + tests := []struct { + subject string + expected string + }{ + {"MERGE_READY nux", "nux"}, + {"MERGED Toast", "Toast"}, + {"MERGE_FAILED ace", "ace"}, + {"REWORK_REQUEST valkyrie", "valkyrie"}, + {"MERGE_READY", ""}, + {"", ""}, + {" MERGE_READY nux ", "nux"}, + } + + for _, tt := range tests { + t.Run(tt.subject, func(t *testing.T) { + result := ExtractPolecat(tt.subject) + if result != tt.expected { + t.Errorf("ExtractPolecat(%q) = %q, want %q", tt.subject, result, tt.expected) + } + }) + } +} + +func TestIsProtocolMessage(t *testing.T) { + tests := []struct { + subject string + expected bool + }{ + {"MERGE_READY nux", true}, + {"MERGED Toast", true}, + {"MERGE_FAILED ace", true}, + {"REWORK_REQUEST valkyrie", true}, + {"Unknown subject", false}, + {"", false}, + {"Hello world", false}, + } + + for _, tt := range tests { + t.Run(tt.subject, func(t *testing.T) { + result := IsProtocolMessage(tt.subject) + if result != tt.expected { + t.Errorf("IsProtocolMessage(%q) = %v, want %v", tt.subject, result, tt.expected) + } + }) + } +} + +func TestNewMergeReadyMessage(t *testing.T) { + msg := NewMergeReadyMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc") + + if msg.Subject != "MERGE_READY nux" { + t.Errorf("Subject = %q, want %q", msg.Subject, "MERGE_READY nux") + } + if msg.From != "gastown/witness" { + t.Errorf("From = %q, want %q", msg.From, "gastown/witness") + } + if msg.To != "gastown/refinery" { + t.Errorf("To = %q, want %q", msg.To, "gastown/refinery") + } + if msg.Priority != mail.PriorityHigh { + t.Errorf("Priority = %q, want %q", msg.Priority, mail.PriorityHigh) + } + if !strings.Contains(msg.Body, "Branch: polecat/nux/gt-abc") { + t.Errorf("Body missing branch: %s", msg.Body) + } + if !strings.Contains(msg.Body, "Issue: gt-abc") { + t.Errorf("Body missing issue: %s", msg.Body) + } +} + +func TestNewMergedMessage(t *testing.T) { + msg := NewMergedMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc", "main", "abc123") + + if msg.Subject != "MERGED nux" { + t.Errorf("Subject = %q, want %q", msg.Subject, "MERGED nux") + } + if msg.From != "gastown/refinery" { + t.Errorf("From = %q, want %q", msg.From, "gastown/refinery") + } + if msg.To != "gastown/witness" { + t.Errorf("To = %q, want %q", msg.To, "gastown/witness") + } + if !strings.Contains(msg.Body, "Merge-Commit: abc123") { + t.Errorf("Body missing merge commit: %s", msg.Body) + } +} + +func TestNewMergeFailedMessage(t *testing.T) { + msg := NewMergeFailedMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc", "main", "tests", "Test failed") + + if msg.Subject != "MERGE_FAILED nux" { + t.Errorf("Subject = %q, want %q", msg.Subject, "MERGE_FAILED nux") + } + if !strings.Contains(msg.Body, "Failure-Type: tests") { + t.Errorf("Body missing failure type: %s", msg.Body) + } + if !strings.Contains(msg.Body, "Error: Test failed") { + t.Errorf("Body missing error: %s", msg.Body) + } +} + +func TestNewReworkRequestMessage(t *testing.T) { + conflicts := []string{"file1.go", "file2.go"} + msg := NewReworkRequestMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc", "main", conflicts) + + if msg.Subject != "REWORK_REQUEST nux" { + t.Errorf("Subject = %q, want %q", msg.Subject, "REWORK_REQUEST nux") + } + if !strings.Contains(msg.Body, "Conflict-Files: file1.go, file2.go") { + t.Errorf("Body missing conflict files: %s", msg.Body) + } + if !strings.Contains(msg.Body, "git rebase origin/main") { + t.Errorf("Body missing rebase instructions: %s", msg.Body) + } +} + +func TestParseMergeReadyPayload(t *testing.T) { + body := `Branch: polecat/nux/gt-abc +Issue: gt-abc +Polecat: nux +Rig: gastown +Verified: clean git state` + + payload := ParseMergeReadyPayload(body) + + if payload.Branch != "polecat/nux/gt-abc" { + t.Errorf("Branch = %q, want %q", payload.Branch, "polecat/nux/gt-abc") + } + if payload.Issue != "gt-abc" { + t.Errorf("Issue = %q, want %q", payload.Issue, "gt-abc") + } + if payload.Polecat != "nux" { + t.Errorf("Polecat = %q, want %q", payload.Polecat, "nux") + } + if payload.Rig != "gastown" { + t.Errorf("Rig = %q, want %q", payload.Rig, "gastown") + } +} + +func TestParseMergedPayload(t *testing.T) { + ts := time.Now().Format(time.RFC3339) + body := `Branch: polecat/nux/gt-abc +Issue: gt-abc +Polecat: nux +Rig: gastown +Target: main +Merged-At: ` + ts + ` +Merge-Commit: abc123` + + payload := ParseMergedPayload(body) + + if payload.Branch != "polecat/nux/gt-abc" { + t.Errorf("Branch = %q, want %q", payload.Branch, "polecat/nux/gt-abc") + } + if payload.MergeCommit != "abc123" { + t.Errorf("MergeCommit = %q, want %q", payload.MergeCommit, "abc123") + } + if payload.TargetBranch != "main" { + t.Errorf("TargetBranch = %q, want %q", payload.TargetBranch, "main") + } +} + +func TestHandlerRegistry(t *testing.T) { + registry := NewHandlerRegistry() + + handled := false + registry.Register(TypeMergeReady, func(msg *mail.Message) error { + handled = true + return nil + }) + + msg := &mail.Message{Subject: "MERGE_READY nux"} + + if !registry.CanHandle(msg) { + t.Error("Registry should be able to handle MERGE_READY message") + } + + if err := registry.Handle(msg); err != nil { + t.Errorf("Handle returned error: %v", err) + } + + if !handled { + t.Error("Handler was not called") + } + + // Test unregistered message type + unknownMsg := &mail.Message{Subject: "UNKNOWN message"} + if registry.CanHandle(unknownMsg) { + t.Error("Registry should not handle unknown message type") + } +} + +func TestWrapWitnessHandlers(t *testing.T) { + handler := &mockWitnessHandler{} + registry := WrapWitnessHandlers(handler) + + // Test MERGED + mergedMsg := &mail.Message{ + Subject: "MERGED nux", + Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown\nTarget: main", + } + if err := registry.Handle(mergedMsg); err != nil { + t.Errorf("HandleMerged error: %v", err) + } + if !handler.mergedCalled { + t.Error("HandleMerged was not called") + } + + // Test MERGE_FAILED + failedMsg := &mail.Message{ + Subject: "MERGE_FAILED nux", + Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown\nTarget: main\nFailure-Type: tests\nError: failed", + } + if err := registry.Handle(failedMsg); err != nil { + t.Errorf("HandleMergeFailed error: %v", err) + } + if !handler.failedCalled { + t.Error("HandleMergeFailed was not called") + } + + // Test REWORK_REQUEST + reworkMsg := &mail.Message{ + Subject: "REWORK_REQUEST nux", + Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown\nTarget: main", + } + if err := registry.Handle(reworkMsg); err != nil { + t.Errorf("HandleReworkRequest error: %v", err) + } + if !handler.reworkCalled { + t.Error("HandleReworkRequest was not called") + } +} + +func TestWrapRefineryHandlers(t *testing.T) { + handler := &mockRefineryHandler{} + registry := WrapRefineryHandlers(handler) + + msg := &mail.Message{ + Subject: "MERGE_READY nux", + Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown", + } + + if err := registry.Handle(msg); err != nil { + t.Errorf("HandleMergeReady error: %v", err) + } + if !handler.readyCalled { + t.Error("HandleMergeReady was not called") + } +} + +func TestDefaultWitnessHandler(t *testing.T) { + tmpDir := t.TempDir() + handler := NewWitnessHandler("gastown", tmpDir) + + // Capture output + var buf bytes.Buffer + handler.SetOutput(&buf) + + // Test HandleMerged + mergedPayload := &MergedPayload{ + Branch: "polecat/nux/gt-abc", + Issue: "gt-abc", + Polecat: "nux", + Rig: "gastown", + TargetBranch: "main", + MergeCommit: "abc123", + } + if err := handler.HandleMerged(mergedPayload); err != nil { + t.Errorf("HandleMerged error: %v", err) + } + if !strings.Contains(buf.String(), "MERGED received") { + t.Errorf("Output missing expected text: %s", buf.String()) + } + + // Test HandleMergeFailed + buf.Reset() + failedPayload := &MergeFailedPayload{ + Branch: "polecat/nux/gt-abc", + Issue: "gt-abc", + Polecat: "nux", + Rig: "gastown", + TargetBranch: "main", + FailureType: "tests", + Error: "Test failed", + } + if err := handler.HandleMergeFailed(failedPayload); err != nil { + t.Errorf("HandleMergeFailed error: %v", err) + } + if !strings.Contains(buf.String(), "MERGE_FAILED received") { + t.Errorf("Output missing expected text: %s", buf.String()) + } + + // Test HandleReworkRequest + buf.Reset() + reworkPayload := &ReworkRequestPayload{ + Branch: "polecat/nux/gt-abc", + Issue: "gt-abc", + Polecat: "nux", + Rig: "gastown", + TargetBranch: "main", + ConflictFiles: []string{"file1.go"}, + } + if err := handler.HandleReworkRequest(reworkPayload); err != nil { + t.Errorf("HandleReworkRequest error: %v", err) + } + if !strings.Contains(buf.String(), "REWORK_REQUEST received") { + t.Errorf("Output missing expected text: %s", buf.String()) + } +} + +// Mock handlers for testing + +type mockWitnessHandler struct { + mergedCalled bool + failedCalled bool + reworkCalled bool +} + +func (m *mockWitnessHandler) HandleMerged(payload *MergedPayload) error { + m.mergedCalled = true + return nil +} + +func (m *mockWitnessHandler) HandleMergeFailed(payload *MergeFailedPayload) error { + m.failedCalled = true + return nil +} + +func (m *mockWitnessHandler) HandleReworkRequest(payload *ReworkRequestPayload) error { + m.reworkCalled = true + return nil +} + +type mockRefineryHandler struct { + readyCalled bool +} + +func (m *mockRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) error { + m.readyCalled = true + return nil +} diff --git a/internal/protocol/refinery_handlers.go b/internal/protocol/refinery_handlers.go new file mode 100644 index 00000000..47b06c8c --- /dev/null +++ b/internal/protocol/refinery_handlers.go @@ -0,0 +1,147 @@ +package protocol + +import ( + "fmt" + "io" + "os" + "time" + + "github.com/steveyegge/gastown/internal/mail" + "github.com/steveyegge/gastown/internal/mrqueue" +) + +// DefaultRefineryHandler provides the default implementation for Refinery protocol handlers. +// It receives MERGE_READY messages from the Witness and adds work to the merge queue. +type DefaultRefineryHandler struct { + // Rig is the name of the rig this refinery processes. + Rig string + + // WorkDir is the working directory for operations. + WorkDir string + + // Queue is the merge request queue. + Queue *mrqueue.Queue + + // Router is used to send mail messages. + Router *mail.Router + + // Output is where to write status messages. + Output io.Writer +} + +// NewRefineryHandler creates a new DefaultRefineryHandler. +func NewRefineryHandler(rig, workDir string) *DefaultRefineryHandler { + return &DefaultRefineryHandler{ + Rig: rig, + WorkDir: workDir, + Queue: mrqueue.New(workDir), + Router: mail.NewRouter(workDir), + Output: os.Stdout, + } +} + +// SetOutput sets the output writer for status messages. +func (h *DefaultRefineryHandler) SetOutput(w io.Writer) { + h.Output = w +} + +// HandleMergeReady handles a MERGE_READY message from Witness. +// When a polecat's work is verified and ready, the Refinery: +// 1. Validates the merge request +// 2. Adds it to the merge queue +// 3. Acknowledges receipt +func (h *DefaultRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) error { + fmt.Fprintf(h.Output, "[Refinery] MERGE_READY received for polecat %s\n", payload.Polecat) + fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch) + fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue) + fmt.Fprintf(h.Output, " Verified: %s\n", payload.Verified) + + // Validate required fields + if payload.Branch == "" { + return fmt.Errorf("missing branch in MERGE_READY payload") + } + if payload.Polecat == "" { + return fmt.Errorf("missing polecat in MERGE_READY payload") + } + + // Create merge request (ID is generated by Submit if empty) + mr := &mrqueue.MR{ + Branch: payload.Branch, + Worker: payload.Polecat, + SourceIssue: payload.Issue, + Target: "main", // Default target, could be passed in payload + Rig: payload.Rig, + Title: fmt.Sprintf("Merge %s work on %s", payload.Polecat, payload.Issue), + CreatedAt: time.Now(), + } + + // Add to queue + if err := h.Queue.Submit(mr); err != nil { + fmt.Fprintf(h.Output, "[Refinery] Error adding to queue: %v\n", err) + return fmt.Errorf("failed to add merge request to queue: %w", err) + } + + fmt.Fprintf(h.Output, "[Refinery] ✓ Added to merge queue: %s\n", mr.ID) + fmt.Fprintf(h.Output, " Queue length: %d\n", h.Queue.Count()) + + return nil +} + +// SendMerged sends a MERGED message to the Witness. +// Called by the Refinery after successfully merging a branch. +func (h *DefaultRefineryHandler) SendMerged(polecat, branch, issue, targetBranch, mergeCommit string) error { + msg := NewMergedMessage(h.Rig, polecat, branch, issue, targetBranch, mergeCommit) + return h.Router.Send(msg) +} + +// SendMergeFailed sends a MERGE_FAILED message to the Witness. +// Called by the Refinery when a merge fails. +func (h *DefaultRefineryHandler) SendMergeFailed(polecat, branch, issue, targetBranch, failureType, errorMsg string) error { + msg := NewMergeFailedMessage(h.Rig, polecat, branch, issue, targetBranch, failureType, errorMsg) + return h.Router.Send(msg) +} + +// SendReworkRequest sends a REWORK_REQUEST message to the Witness. +// Called by the Refinery when a branch has conflicts. +func (h *DefaultRefineryHandler) SendReworkRequest(polecat, branch, issue, targetBranch string, conflictFiles []string) error { + msg := NewReworkRequestMessage(h.Rig, polecat, branch, issue, targetBranch, conflictFiles) + return h.Router.Send(msg) +} + +// NotifyMergeOutcome is a convenience method that sends the appropriate message +// based on the merge result. +type MergeOutcome struct { + // Success indicates whether the merge was successful. + Success bool + + // Conflict indicates the failure was due to conflicts (needs rebase). + Conflict bool + + // FailureType categorizes the failure (e.g., "tests", "build"). + FailureType string + + // Error is the error message if the merge failed. + Error string + + // MergeCommit is the SHA of the merge commit on success. + MergeCommit string + + // ConflictFiles lists files with conflicts (if Conflict is true). + ConflictFiles []string +} + +// NotifyMergeOutcome sends the appropriate protocol message based on the outcome. +func (h *DefaultRefineryHandler) NotifyMergeOutcome(polecat, branch, issue, targetBranch string, outcome MergeOutcome) error { + if outcome.Success { + return h.SendMerged(polecat, branch, issue, targetBranch, outcome.MergeCommit) + } + + if outcome.Conflict { + return h.SendReworkRequest(polecat, branch, issue, targetBranch, outcome.ConflictFiles) + } + + return h.SendMergeFailed(polecat, branch, issue, targetBranch, outcome.FailureType, outcome.Error) +} + +// Ensure DefaultRefineryHandler implements RefineryHandler. +var _ RefineryHandler = (*DefaultRefineryHandler)(nil) diff --git a/internal/protocol/types.go b/internal/protocol/types.go new file mode 100644 index 00000000..bbaff419 --- /dev/null +++ b/internal/protocol/types.go @@ -0,0 +1,182 @@ +// Package protocol provides inter-agent protocol message handling. +// +// This package defines protocol message types for Witness-Refinery communication +// and provides handlers for processing these messages. +// +// Protocol Message Types: +// - MERGE_READY: Witness → Refinery (branch ready for merge) +// - MERGED: Refinery → Witness (merge succeeded, cleanup ok) +// - MERGE_FAILED: Refinery → Witness (merge failed, needs rework) +// - REWORK_REQUEST: Refinery → Witness (rebase needed) +package protocol + +import ( + "strings" + "time" +) + +// MessageType identifies the protocol message type. +type MessageType string + +const ( + // TypeMergeReady is sent from Witness to Refinery when a polecat's work + // is verified and ready for merge queue processing. + // Subject format: "MERGE_READY " + TypeMergeReady MessageType = "MERGE_READY" + + // TypeMerged is sent from Refinery to Witness when a branch has been + // successfully merged to the target branch. + // Subject format: "MERGED " + TypeMerged MessageType = "MERGED" + + // TypeMergeFailed is sent from Refinery to Witness when a merge attempt + // failed (tests, build, or other non-conflict error). + // Subject format: "MERGE_FAILED " + TypeMergeFailed MessageType = "MERGE_FAILED" + + // TypeReworkRequest is sent from Refinery to Witness when a polecat's + // branch needs rebasing due to conflicts with the target branch. + // Subject format: "REWORK_REQUEST " + TypeReworkRequest MessageType = "REWORK_REQUEST" +) + +// ParseMessageType extracts the protocol message type from a mail subject. +// Returns empty string if subject doesn't match a known protocol type. +func ParseMessageType(subject string) MessageType { + subject = strings.TrimSpace(subject) + + // Check each known prefix + prefixes := []MessageType{ + TypeMergeReady, + TypeMerged, + TypeMergeFailed, + TypeReworkRequest, + } + + for _, prefix := range prefixes { + if strings.HasPrefix(subject, string(prefix)) { + return prefix + } + } + + return "" +} + +// MergeReadyPayload contains the data for a MERGE_READY message. +// Sent by Witness after verifying polecat work is complete. +type MergeReadyPayload struct { + // Branch is the polecat's work branch (e.g., "polecat/Toast/gt-abc"). + Branch string `json:"branch"` + + // Issue is the beads issue ID the polecat completed. + Issue string `json:"issue"` + + // Polecat is the worker name. + Polecat string `json:"polecat"` + + // Rig is the rig name containing the polecat. + Rig string `json:"rig"` + + // Verified contains verification notes. + Verified string `json:"verified,omitempty"` + + // Timestamp is when the message was created. + Timestamp time.Time `json:"timestamp"` +} + +// MergedPayload contains the data for a MERGED message. +// Sent by Refinery after successful merge to target branch. +type MergedPayload struct { + // Branch is the source branch that was merged. + Branch string `json:"branch"` + + // Issue is the beads issue ID. + Issue string `json:"issue"` + + // Polecat is the worker name. + Polecat string `json:"polecat"` + + // Rig is the rig name. + Rig string `json:"rig"` + + // MergedAt is when the merge completed. + MergedAt time.Time `json:"merged_at"` + + // MergeCommit is the SHA of the merge commit. + MergeCommit string `json:"merge_commit,omitempty"` + + // TargetBranch is the branch merged into (e.g., "main"). + TargetBranch string `json:"target_branch"` +} + +// MergeFailedPayload contains the data for a MERGE_FAILED message. +// Sent by Refinery when merge fails due to tests, build, or other errors. +type MergeFailedPayload struct { + // Branch is the source branch that failed to merge. + Branch string `json:"branch"` + + // Issue is the beads issue ID. + Issue string `json:"issue"` + + // Polecat is the worker name. + Polecat string `json:"polecat"` + + // Rig is the rig name. + Rig string `json:"rig"` + + // FailedAt is when the failure occurred. + FailedAt time.Time `json:"failed_at"` + + // FailureType categorizes the failure (tests, build, push, etc.). + FailureType string `json:"failure_type"` + + // Error is the error message. + Error string `json:"error"` + + // TargetBranch is the branch we tried to merge into. + TargetBranch string `json:"target_branch"` +} + +// ReworkRequestPayload contains the data for a REWORK_REQUEST message. +// Sent by Refinery when a polecat's branch has conflicts requiring rebase. +type ReworkRequestPayload struct { + // Branch is the source branch that needs rebasing. + Branch string `json:"branch"` + + // Issue is the beads issue ID. + Issue string `json:"issue"` + + // Polecat is the worker name. + Polecat string `json:"polecat"` + + // Rig is the rig name. + Rig string `json:"rig"` + + // RequestedAt is when the rework was requested. + RequestedAt time.Time `json:"requested_at"` + + // TargetBranch is the branch to rebase onto. + TargetBranch string `json:"target_branch"` + + // ConflictFiles lists files with conflicts (if known). + ConflictFiles []string `json:"conflict_files,omitempty"` + + // Instructions provides specific rebase instructions. + Instructions string `json:"instructions,omitempty"` +} + +// IsProtocolMessage returns true if the subject matches a known protocol type. +func IsProtocolMessage(subject string) bool { + return ParseMessageType(subject) != "" +} + +// ExtractPolecat extracts the polecat name from a protocol message subject. +// Subject format: "TYPE " +func ExtractPolecat(subject string) string { + subject = strings.TrimSpace(subject) + parts := strings.SplitN(subject, " ", 2) + if len(parts) < 2 { + return "" + } + return strings.TrimSpace(parts[1]) +} diff --git a/internal/protocol/witness_handlers.go b/internal/protocol/witness_handlers.go new file mode 100644 index 00000000..4599675d --- /dev/null +++ b/internal/protocol/witness_handlers.go @@ -0,0 +1,209 @@ +package protocol + +import ( + "fmt" + "io" + "os" + + "github.com/steveyegge/gastown/internal/mail" +) + +// DefaultWitnessHandler provides the default implementation for Witness protocol handlers. +// It receives messages from the Refinery about merge outcomes and takes appropriate action. +type DefaultWitnessHandler struct { + // Rig is the name of the rig this witness manages. + Rig string + + // WorkDir is the working directory for operations. + WorkDir string + + // Router is used to send mail messages. + Router *mail.Router + + // Output is where to write status messages. + Output io.Writer +} + +// NewWitnessHandler creates a new DefaultWitnessHandler. +func NewWitnessHandler(rig, workDir string) *DefaultWitnessHandler { + return &DefaultWitnessHandler{ + Rig: rig, + WorkDir: workDir, + Router: mail.NewRouter(workDir), + Output: os.Stdout, + } +} + +// SetOutput sets the output writer for status messages. +func (h *DefaultWitnessHandler) SetOutput(w io.Writer) { + h.Output = w +} + +// HandleMerged handles a MERGED message from Refinery. +// When a branch is successfully merged, the Witness: +// 1. Logs the success +// 2. Notifies the polecat of successful merge +// 3. Initiates polecat cleanup (nuke worktree) +func (h *DefaultWitnessHandler) HandleMerged(payload *MergedPayload) error { + fmt.Fprintf(h.Output, "[Witness] MERGED received for polecat %s\n", payload.Polecat) + fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch) + fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue) + fmt.Fprintf(h.Output, " Merged to: %s\n", payload.TargetBranch) + if payload.MergeCommit != "" { + fmt.Fprintf(h.Output, " Commit: %s\n", payload.MergeCommit) + } + + // Notify the polecat about successful merge + if err := h.notifyPolecatMerged(payload); err != nil { + fmt.Fprintf(h.Output, "[Witness] Warning: failed to notify polecat: %v\n", err) + // Continue - notification is best-effort + } + + // Initiate polecat cleanup + // Note: Actual cleanup is done by a separate process/molecule + // This handler just records that cleanup is needed + fmt.Fprintf(h.Output, "[Witness] ✓ Polecat %s work merged, cleanup can proceed\n", payload.Polecat) + + return nil +} + +// HandleMergeFailed handles a MERGE_FAILED message from Refinery. +// When a merge fails (tests, build, etc.), the Witness: +// 1. Logs the failure +// 2. Notifies the polecat about the failure and required fixes +// 3. Updates the polecat's state to indicate rework needed +func (h *DefaultWitnessHandler) HandleMergeFailed(payload *MergeFailedPayload) error { + fmt.Fprintf(h.Output, "[Witness] MERGE_FAILED received for polecat %s\n", payload.Polecat) + fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch) + fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue) + fmt.Fprintf(h.Output, " Failure type: %s\n", payload.FailureType) + fmt.Fprintf(h.Output, " Error: %s\n", payload.Error) + + // Notify the polecat about the failure + if err := h.notifyPolecatFailed(payload); err != nil { + fmt.Fprintf(h.Output, "[Witness] Warning: failed to notify polecat: %v\n", err) + // Continue - notification is best-effort + } + + fmt.Fprintf(h.Output, "[Witness] ✗ Polecat %s merge failed, rework needed\n", payload.Polecat) + + return nil +} + +// HandleReworkRequest handles a REWORK_REQUEST message from Refinery. +// When a branch has conflicts requiring rebase, the Witness: +// 1. Logs the conflict +// 2. Notifies the polecat with rebase instructions +// 3. Updates the polecat's state to indicate rebase needed +func (h *DefaultWitnessHandler) HandleReworkRequest(payload *ReworkRequestPayload) error { + fmt.Fprintf(h.Output, "[Witness] REWORK_REQUEST received for polecat %s\n", payload.Polecat) + fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch) + fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue) + fmt.Fprintf(h.Output, " Target: %s\n", payload.TargetBranch) + if len(payload.ConflictFiles) > 0 { + fmt.Fprintf(h.Output, " Conflicts in: %v\n", payload.ConflictFiles) + } + + // Notify the polecat about the rebase requirement + if err := h.notifyPolecatRebase(payload); err != nil { + fmt.Fprintf(h.Output, "[Witness] Warning: failed to notify polecat: %v\n", err) + // Continue - notification is best-effort + } + + fmt.Fprintf(h.Output, "[Witness] ⚠ Polecat %s needs to rebase onto %s\n", payload.Polecat, payload.TargetBranch) + + return nil +} + +// notifyPolecatMerged sends a merge success notification to a polecat. +func (h *DefaultWitnessHandler) notifyPolecatMerged(payload *MergedPayload) error { + msg := mail.NewMessage( + fmt.Sprintf("%s/witness", h.Rig), + fmt.Sprintf("%s/%s", h.Rig, payload.Polecat), + "Work merged successfully", + fmt.Sprintf(`Your work has been merged to %s. + +Branch: %s +Issue: %s +Commit: %s + +Thank you for your contribution! Your worktree will be cleaned up shortly.`, + payload.TargetBranch, + payload.Branch, + payload.Issue, + payload.MergeCommit, + ), + ) + msg.Priority = mail.PriorityNormal + + return h.Router.Send(msg) +} + +// notifyPolecatFailed sends a merge failure notification to a polecat. +func (h *DefaultWitnessHandler) notifyPolecatFailed(payload *MergeFailedPayload) error { + msg := mail.NewMessage( + fmt.Sprintf("%s/witness", h.Rig), + fmt.Sprintf("%s/%s", h.Rig, payload.Polecat), + fmt.Sprintf("Merge failed: %s", payload.FailureType), + fmt.Sprintf(`Your merge request failed. + +Branch: %s +Issue: %s +Failure: %s +Error: %s + +Please fix the issue and resubmit your work with 'gt done'.`, + payload.Branch, + payload.Issue, + payload.FailureType, + payload.Error, + ), + ) + msg.Priority = mail.PriorityHigh + msg.Type = mail.TypeTask + + return h.Router.Send(msg) +} + +// notifyPolecatRebase sends a rebase request notification to a polecat. +func (h *DefaultWitnessHandler) notifyPolecatRebase(payload *ReworkRequestPayload) error { + conflictInfo := "" + if len(payload.ConflictFiles) > 0 { + conflictInfo = fmt.Sprintf("\nConflicting files:\n") + for _, f := range payload.ConflictFiles { + conflictInfo += fmt.Sprintf(" - %s\n", f) + } + } + + msg := mail.NewMessage( + fmt.Sprintf("%s/witness", h.Rig), + fmt.Sprintf("%s/%s", h.Rig, payload.Polecat), + "Rebase required - merge conflict", + fmt.Sprintf(`Your branch has conflicts with %s. + +Branch: %s +Issue: %s +%s +Please rebase your changes: + + git fetch origin + git rebase origin/%s + # Resolve any conflicts + git push -f + +Then run 'gt done' to resubmit for merge.`, + payload.TargetBranch, + payload.Branch, + payload.Issue, + conflictInfo, + payload.TargetBranch, + ), + ) + msg.Priority = mail.PriorityHigh + msg.Type = mail.TypeTask + + return h.Router.Send(msg) +} + +// Ensure DefaultWitnessHandler implements WitnessHandler. +var _ WitnessHandler = (*DefaultWitnessHandler)(nil)