Implement Witness-Refinery protocol handlers (gt-m5w4g.2)

Add internal/protocol/ package with handlers for:
- MERGE_READY (witness→refinery): worker done, branch ready
- MERGED (refinery→witness): merge succeeded, cleanup ok
- MERGE_FAILED (refinery→witness): merge failed, needs rework
- REWORK_REQUEST (refinery→witness): rebase needed due to conflicts

Package structure:
- types.go: Protocol message types and payload structs
- messages.go: Message builders and body parsers
- handlers.go: Handler interface and registry for dispatch
- witness_handlers.go: DefaultWitnessHandler implementation
- refinery_handlers.go: DefaultRefineryHandler implementation
- protocol_test.go: Comprehensive test coverage

Also updated docs/mail-protocol.md with:
- MERGE_FAILED and REWORK_REQUEST message type documentation
- Merge failure and rebase required flow diagrams
- Reference to internal/protocol/ package

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-30 10:42:13 -08:00
parent 9968aceddb
commit 96ffbf0188
7 changed files with 1428 additions and 2 deletions

View File

@@ -61,13 +61,73 @@ Verified: clean git state, issue closed
```
Branch: <branch>
Issue: <issue-id>
Polecat: <polecat-name>
Rig: <rig>
Target: <target-branch>
Merged-At: <timestamp>
Merge-Commit: <sha>
```
**Trigger**: Refinery sends after successful merge to main.
**Handler**: Witness completes cleanup wisp, nukes polecat worktree.
### MERGE_FAILED
**Route**: Refinery → Witness
**Purpose**: Notify that merge attempt failed (tests, build, or other non-conflict error).
**Subject format**: `MERGE_FAILED <polecat-name>`
**Body format**:
```
Branch: <branch>
Issue: <issue-id>
Polecat: <polecat-name>
Rig: <rig>
Target: <target-branch>
Failed-At: <timestamp>
Failure-Type: <tests|build|push|other>
Error: <error-message>
```
**Trigger**: Refinery sends when merge fails for non-conflict reasons.
**Handler**: Witness notifies polecat, assigns work back for rework.
### REWORK_REQUEST
**Route**: Refinery → Witness
**Purpose**: Request polecat to rebase branch due to merge conflicts.
**Subject format**: `REWORK_REQUEST <polecat-name>`
**Body format**:
```
Branch: <branch>
Issue: <issue-id>
Polecat: <polecat-name>
Rig: <rig>
Target: <target-branch>
Requested-At: <timestamp>
Conflict-Files: <file1>, <file2>, ...
Please rebase your changes onto <target-branch>:
git fetch origin
git rebase origin/<target-branch>
# Resolve any conflicts
git push -f
The Refinery will retry the merge after rebase is complete.
```
**Trigger**: Refinery sends when merge has conflicts with target branch.
**Handler**: Witness notifies polecat with rebase instructions.
### WITNESS_PING
**Route**: Witness → Deacon (all witnesses send)
@@ -184,15 +244,52 @@ Polecat Witness Refinery
│ │ MERGE_READY │
│ │─────────────────────────>│
│ │ │
│ │ (merge to main)
│ │ (merge attempt)
│ │ │
│ │ MERGED
│ │ MERGED (success)
│ │<─────────────────────────│
│ │ │
│ (nuke polecat) │
│ │ │
```
### Merge Failure Flow
```
Witness Refinery
│ │
│ (merge fails)
│ │
│ MERGE_FAILED │
┌──────────────────────────│<─────────────────────────│
│ │ │
│ (failure notification) │ │
│<─────────────────────────│ │
│ │ │
Polecat (rework needed)
```
### Rebase Required Flow
```
Witness Refinery
│ │
│ (conflict detected)
│ │
│ REWORK_REQUEST │
┌──────────────────────────│<─────────────────────────│
│ │ │
│ (rebase instructions) │ │
│<─────────────────────────│ │
│ │ │
Polecat │ │
│ │ │
│ (rebases, gt done) │ │
│─────────────────────────>│ MERGE_READY │
│ │─────────────────────────>│
│ │ (retry merge)
```
### Second-Order Monitoring
```
@@ -261,3 +358,4 @@ flexible enough for human debugging.
- `docs/agent-as-bead.md` - Agent identity and slots
- `.beads/formulas/mol-witness-patrol.formula.toml` - Witness handling
- `internal/mail/` - Mail routing implementation
- `internal/protocol/` - Protocol handlers for Witness-Refinery communication

View File

@@ -0,0 +1,124 @@
package protocol
import (
"fmt"
"github.com/steveyegge/gastown/internal/mail"
)
// Handler processes a protocol message and returns an error if processing failed.
type Handler func(msg *mail.Message) error
// HandlerRegistry maps message types to their handlers.
type HandlerRegistry struct {
handlers map[MessageType]Handler
}
// NewHandlerRegistry creates a new handler registry.
func NewHandlerRegistry() *HandlerRegistry {
return &HandlerRegistry{
handlers: make(map[MessageType]Handler),
}
}
// Register adds a handler for a specific message type.
func (r *HandlerRegistry) Register(msgType MessageType, handler Handler) {
r.handlers[msgType] = handler
}
// Handle dispatches a message to the appropriate handler.
// Returns an error if no handler is registered for the message type.
func (r *HandlerRegistry) Handle(msg *mail.Message) error {
msgType := ParseMessageType(msg.Subject)
if msgType == "" {
return fmt.Errorf("unknown message type for subject: %s", msg.Subject)
}
handler, ok := r.handlers[msgType]
if !ok {
return fmt.Errorf("no handler registered for message type: %s", msgType)
}
return handler(msg)
}
// CanHandle returns true if a handler is registered for the message's type.
func (r *HandlerRegistry) CanHandle(msg *mail.Message) bool {
msgType := ParseMessageType(msg.Subject)
if msgType == "" {
return false
}
_, ok := r.handlers[msgType]
return ok
}
// WitnessHandler defines the interface for Witness protocol handlers.
// The Witness receives messages from Refinery about merge status.
type WitnessHandler interface {
// HandleMerged is called when a branch was successfully merged.
HandleMerged(payload *MergedPayload) error
// HandleMergeFailed is called when a merge attempt failed.
HandleMergeFailed(payload *MergeFailedPayload) error
// HandleReworkRequest is called when a branch needs rebasing.
HandleReworkRequest(payload *ReworkRequestPayload) error
}
// RefineryHandler defines the interface for Refinery protocol handlers.
// The Refinery receives messages from Witness about ready branches.
type RefineryHandler interface {
// HandleMergeReady is called when a polecat's work is verified and ready.
HandleMergeReady(payload *MergeReadyPayload) error
}
// WrapWitnessHandlers creates mail handlers from a WitnessHandler.
func WrapWitnessHandlers(h WitnessHandler) *HandlerRegistry {
registry := NewHandlerRegistry()
registry.Register(TypeMerged, func(msg *mail.Message) error {
payload := ParseMergedPayload(msg.Body)
return h.HandleMerged(payload)
})
registry.Register(TypeMergeFailed, func(msg *mail.Message) error {
payload := ParseMergeFailedPayload(msg.Body)
return h.HandleMergeFailed(payload)
})
registry.Register(TypeReworkRequest, func(msg *mail.Message) error {
payload := ParseReworkRequestPayload(msg.Body)
return h.HandleReworkRequest(payload)
})
return registry
}
// WrapRefineryHandlers creates mail handlers from a RefineryHandler.
func WrapRefineryHandlers(h RefineryHandler) *HandlerRegistry {
registry := NewHandlerRegistry()
registry.Register(TypeMergeReady, func(msg *mail.Message) error {
payload := ParseMergeReadyPayload(msg.Body)
return h.HandleMergeReady(payload)
})
return registry
}
// ProcessProtocolMessage processes a protocol message using the registry.
// It returns (true, nil) if the message was handled successfully,
// (true, error) if handling failed, or (false, nil) if not a protocol message.
func (r *HandlerRegistry) ProcessProtocolMessage(msg *mail.Message) (bool, error) {
if !IsProtocolMessage(msg.Subject) {
return false, nil
}
if !r.CanHandle(msg) {
return false, nil
}
err := r.Handle(msg)
return true, err
}

View File

@@ -0,0 +1,288 @@
package protocol
import (
"fmt"
"strings"
"time"
"github.com/steveyegge/gastown/internal/mail"
)
// NewMergeReadyMessage creates a MERGE_READY protocol message.
// Sent by Witness to Refinery when a polecat's work is verified and ready.
func NewMergeReadyMessage(rig, polecat, branch, issue string) *mail.Message {
payload := MergeReadyPayload{
Branch: branch,
Issue: issue,
Polecat: polecat,
Rig: rig,
Verified: "clean git state, issue closed",
Timestamp: time.Now(),
}
body := formatMergeReadyBody(payload)
msg := mail.NewMessage(
fmt.Sprintf("%s/witness", rig),
fmt.Sprintf("%s/refinery", rig),
fmt.Sprintf("MERGE_READY %s", polecat),
body,
)
msg.Priority = mail.PriorityHigh
msg.Type = mail.TypeTask
return msg
}
// formatMergeReadyBody formats the body of a MERGE_READY message.
func formatMergeReadyBody(p MergeReadyPayload) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch))
sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue))
sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat))
sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig))
if p.Verified != "" {
sb.WriteString(fmt.Sprintf("Verified: %s\n", p.Verified))
}
return sb.String()
}
// NewMergedMessage creates a MERGED protocol message.
// Sent by Refinery to Witness when a branch is successfully merged.
func NewMergedMessage(rig, polecat, branch, issue, targetBranch, mergeCommit string) *mail.Message {
payload := MergedPayload{
Branch: branch,
Issue: issue,
Polecat: polecat,
Rig: rig,
MergedAt: time.Now(),
MergeCommit: mergeCommit,
TargetBranch: targetBranch,
}
body := formatMergedBody(payload)
msg := mail.NewMessage(
fmt.Sprintf("%s/refinery", rig),
fmt.Sprintf("%s/witness", rig),
fmt.Sprintf("MERGED %s", polecat),
body,
)
msg.Priority = mail.PriorityHigh
msg.Type = mail.TypeNotification
return msg
}
// formatMergedBody formats the body of a MERGED message.
func formatMergedBody(p MergedPayload) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch))
sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue))
sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat))
sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig))
sb.WriteString(fmt.Sprintf("Target: %s\n", p.TargetBranch))
sb.WriteString(fmt.Sprintf("Merged-At: %s\n", p.MergedAt.Format(time.RFC3339)))
if p.MergeCommit != "" {
sb.WriteString(fmt.Sprintf("Merge-Commit: %s\n", p.MergeCommit))
}
return sb.String()
}
// NewMergeFailedMessage creates a MERGE_FAILED protocol message.
// Sent by Refinery to Witness when merge fails (tests, build, etc.).
func NewMergeFailedMessage(rig, polecat, branch, issue, targetBranch, failureType, errorMsg string) *mail.Message {
payload := MergeFailedPayload{
Branch: branch,
Issue: issue,
Polecat: polecat,
Rig: rig,
FailedAt: time.Now(),
FailureType: failureType,
Error: errorMsg,
TargetBranch: targetBranch,
}
body := formatMergeFailedBody(payload)
msg := mail.NewMessage(
fmt.Sprintf("%s/refinery", rig),
fmt.Sprintf("%s/witness", rig),
fmt.Sprintf("MERGE_FAILED %s", polecat),
body,
)
msg.Priority = mail.PriorityHigh
msg.Type = mail.TypeTask
return msg
}
// formatMergeFailedBody formats the body of a MERGE_FAILED message.
func formatMergeFailedBody(p MergeFailedPayload) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch))
sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue))
sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat))
sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig))
sb.WriteString(fmt.Sprintf("Target: %s\n", p.TargetBranch))
sb.WriteString(fmt.Sprintf("Failed-At: %s\n", p.FailedAt.Format(time.RFC3339)))
sb.WriteString(fmt.Sprintf("Failure-Type: %s\n", p.FailureType))
sb.WriteString(fmt.Sprintf("Error: %s\n", p.Error))
return sb.String()
}
// NewReworkRequestMessage creates a REWORK_REQUEST protocol message.
// Sent by Refinery to Witness when a branch needs rebasing due to conflicts.
func NewReworkRequestMessage(rig, polecat, branch, issue, targetBranch string, conflictFiles []string) *mail.Message {
payload := ReworkRequestPayload{
Branch: branch,
Issue: issue,
Polecat: polecat,
Rig: rig,
RequestedAt: time.Now(),
TargetBranch: targetBranch,
ConflictFiles: conflictFiles,
Instructions: formatRebaseInstructions(targetBranch),
}
body := formatReworkRequestBody(payload)
msg := mail.NewMessage(
fmt.Sprintf("%s/refinery", rig),
fmt.Sprintf("%s/witness", rig),
fmt.Sprintf("REWORK_REQUEST %s", polecat),
body,
)
msg.Priority = mail.PriorityHigh
msg.Type = mail.TypeTask
return msg
}
// formatReworkRequestBody formats the body of a REWORK_REQUEST message.
func formatReworkRequestBody(p ReworkRequestPayload) string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("Branch: %s\n", p.Branch))
sb.WriteString(fmt.Sprintf("Issue: %s\n", p.Issue))
sb.WriteString(fmt.Sprintf("Polecat: %s\n", p.Polecat))
sb.WriteString(fmt.Sprintf("Rig: %s\n", p.Rig))
sb.WriteString(fmt.Sprintf("Target: %s\n", p.TargetBranch))
sb.WriteString(fmt.Sprintf("Requested-At: %s\n", p.RequestedAt.Format(time.RFC3339)))
if len(p.ConflictFiles) > 0 {
sb.WriteString(fmt.Sprintf("Conflict-Files: %s\n", strings.Join(p.ConflictFiles, ", ")))
}
sb.WriteString("\n")
sb.WriteString(p.Instructions)
return sb.String()
}
// formatRebaseInstructions returns standard rebase instructions.
func formatRebaseInstructions(targetBranch string) string {
return fmt.Sprintf(`Please rebase your changes onto %s:
git fetch origin
git rebase origin/%s
# Resolve any conflicts
git push -f
The Refinery will retry the merge after rebase is complete.`, targetBranch, targetBranch)
}
// ParseMergeReadyPayload parses a MERGE_READY message body into a payload.
func ParseMergeReadyPayload(body string) *MergeReadyPayload {
return &MergeReadyPayload{
Branch: parseField(body, "Branch"),
Issue: parseField(body, "Issue"),
Polecat: parseField(body, "Polecat"),
Rig: parseField(body, "Rig"),
Verified: parseField(body, "Verified"),
Timestamp: time.Now(), // Use current time if not parseable
}
}
// ParseMergedPayload parses a MERGED message body into a payload.
func ParseMergedPayload(body string) *MergedPayload {
payload := &MergedPayload{
Branch: parseField(body, "Branch"),
Issue: parseField(body, "Issue"),
Polecat: parseField(body, "Polecat"),
Rig: parseField(body, "Rig"),
TargetBranch: parseField(body, "Target"),
MergeCommit: parseField(body, "Merge-Commit"),
}
// Parse timestamp
if ts := parseField(body, "Merged-At"); ts != "" {
if t, err := time.Parse(time.RFC3339, ts); err == nil {
payload.MergedAt = t
}
}
return payload
}
// ParseMergeFailedPayload parses a MERGE_FAILED message body into a payload.
func ParseMergeFailedPayload(body string) *MergeFailedPayload {
payload := &MergeFailedPayload{
Branch: parseField(body, "Branch"),
Issue: parseField(body, "Issue"),
Polecat: parseField(body, "Polecat"),
Rig: parseField(body, "Rig"),
TargetBranch: parseField(body, "Target"),
FailureType: parseField(body, "Failure-Type"),
Error: parseField(body, "Error"),
}
// Parse timestamp
if ts := parseField(body, "Failed-At"); ts != "" {
if t, err := time.Parse(time.RFC3339, ts); err == nil {
payload.FailedAt = t
}
}
return payload
}
// ParseReworkRequestPayload parses a REWORK_REQUEST message body into a payload.
func ParseReworkRequestPayload(body string) *ReworkRequestPayload {
payload := &ReworkRequestPayload{
Branch: parseField(body, "Branch"),
Issue: parseField(body, "Issue"),
Polecat: parseField(body, "Polecat"),
Rig: parseField(body, "Rig"),
TargetBranch: parseField(body, "Target"),
}
// Parse timestamp
if ts := parseField(body, "Requested-At"); ts != "" {
if t, err := time.Parse(time.RFC3339, ts); err == nil {
payload.RequestedAt = t
}
}
// Parse conflict files
if files := parseField(body, "Conflict-Files"); files != "" {
payload.ConflictFiles = strings.Split(files, ", ")
}
return payload
}
// parseField extracts a field value from a key-value body format.
// Format: "Key: value"
func parseField(body, key string) string {
lines := strings.Split(body, "\n")
prefix := key + ": "
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, prefix) {
return strings.TrimPrefix(line, prefix)
}
}
return ""
}

View File

@@ -0,0 +1,378 @@
package protocol
import (
"bytes"
"strings"
"testing"
"time"
"github.com/steveyegge/gastown/internal/mail"
)
func TestParseMessageType(t *testing.T) {
tests := []struct {
subject string
expected MessageType
}{
{"MERGE_READY nux", TypeMergeReady},
{"MERGED Toast", TypeMerged},
{"MERGE_FAILED ace", TypeMergeFailed},
{"REWORK_REQUEST valkyrie", TypeReworkRequest},
{"MERGE_READY", TypeMergeReady}, // no polecat name
{"Unknown subject", ""},
{"", ""},
{" MERGE_READY nux ", TypeMergeReady}, // with whitespace
}
for _, tt := range tests {
t.Run(tt.subject, func(t *testing.T) {
result := ParseMessageType(tt.subject)
if result != tt.expected {
t.Errorf("ParseMessageType(%q) = %q, want %q", tt.subject, result, tt.expected)
}
})
}
}
func TestExtractPolecat(t *testing.T) {
tests := []struct {
subject string
expected string
}{
{"MERGE_READY nux", "nux"},
{"MERGED Toast", "Toast"},
{"MERGE_FAILED ace", "ace"},
{"REWORK_REQUEST valkyrie", "valkyrie"},
{"MERGE_READY", ""},
{"", ""},
{" MERGE_READY nux ", "nux"},
}
for _, tt := range tests {
t.Run(tt.subject, func(t *testing.T) {
result := ExtractPolecat(tt.subject)
if result != tt.expected {
t.Errorf("ExtractPolecat(%q) = %q, want %q", tt.subject, result, tt.expected)
}
})
}
}
func TestIsProtocolMessage(t *testing.T) {
tests := []struct {
subject string
expected bool
}{
{"MERGE_READY nux", true},
{"MERGED Toast", true},
{"MERGE_FAILED ace", true},
{"REWORK_REQUEST valkyrie", true},
{"Unknown subject", false},
{"", false},
{"Hello world", false},
}
for _, tt := range tests {
t.Run(tt.subject, func(t *testing.T) {
result := IsProtocolMessage(tt.subject)
if result != tt.expected {
t.Errorf("IsProtocolMessage(%q) = %v, want %v", tt.subject, result, tt.expected)
}
})
}
}
func TestNewMergeReadyMessage(t *testing.T) {
msg := NewMergeReadyMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc")
if msg.Subject != "MERGE_READY nux" {
t.Errorf("Subject = %q, want %q", msg.Subject, "MERGE_READY nux")
}
if msg.From != "gastown/witness" {
t.Errorf("From = %q, want %q", msg.From, "gastown/witness")
}
if msg.To != "gastown/refinery" {
t.Errorf("To = %q, want %q", msg.To, "gastown/refinery")
}
if msg.Priority != mail.PriorityHigh {
t.Errorf("Priority = %q, want %q", msg.Priority, mail.PriorityHigh)
}
if !strings.Contains(msg.Body, "Branch: polecat/nux/gt-abc") {
t.Errorf("Body missing branch: %s", msg.Body)
}
if !strings.Contains(msg.Body, "Issue: gt-abc") {
t.Errorf("Body missing issue: %s", msg.Body)
}
}
func TestNewMergedMessage(t *testing.T) {
msg := NewMergedMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc", "main", "abc123")
if msg.Subject != "MERGED nux" {
t.Errorf("Subject = %q, want %q", msg.Subject, "MERGED nux")
}
if msg.From != "gastown/refinery" {
t.Errorf("From = %q, want %q", msg.From, "gastown/refinery")
}
if msg.To != "gastown/witness" {
t.Errorf("To = %q, want %q", msg.To, "gastown/witness")
}
if !strings.Contains(msg.Body, "Merge-Commit: abc123") {
t.Errorf("Body missing merge commit: %s", msg.Body)
}
}
func TestNewMergeFailedMessage(t *testing.T) {
msg := NewMergeFailedMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc", "main", "tests", "Test failed")
if msg.Subject != "MERGE_FAILED nux" {
t.Errorf("Subject = %q, want %q", msg.Subject, "MERGE_FAILED nux")
}
if !strings.Contains(msg.Body, "Failure-Type: tests") {
t.Errorf("Body missing failure type: %s", msg.Body)
}
if !strings.Contains(msg.Body, "Error: Test failed") {
t.Errorf("Body missing error: %s", msg.Body)
}
}
func TestNewReworkRequestMessage(t *testing.T) {
conflicts := []string{"file1.go", "file2.go"}
msg := NewReworkRequestMessage("gastown", "nux", "polecat/nux/gt-abc", "gt-abc", "main", conflicts)
if msg.Subject != "REWORK_REQUEST nux" {
t.Errorf("Subject = %q, want %q", msg.Subject, "REWORK_REQUEST nux")
}
if !strings.Contains(msg.Body, "Conflict-Files: file1.go, file2.go") {
t.Errorf("Body missing conflict files: %s", msg.Body)
}
if !strings.Contains(msg.Body, "git rebase origin/main") {
t.Errorf("Body missing rebase instructions: %s", msg.Body)
}
}
func TestParseMergeReadyPayload(t *testing.T) {
body := `Branch: polecat/nux/gt-abc
Issue: gt-abc
Polecat: nux
Rig: gastown
Verified: clean git state`
payload := ParseMergeReadyPayload(body)
if payload.Branch != "polecat/nux/gt-abc" {
t.Errorf("Branch = %q, want %q", payload.Branch, "polecat/nux/gt-abc")
}
if payload.Issue != "gt-abc" {
t.Errorf("Issue = %q, want %q", payload.Issue, "gt-abc")
}
if payload.Polecat != "nux" {
t.Errorf("Polecat = %q, want %q", payload.Polecat, "nux")
}
if payload.Rig != "gastown" {
t.Errorf("Rig = %q, want %q", payload.Rig, "gastown")
}
}
func TestParseMergedPayload(t *testing.T) {
ts := time.Now().Format(time.RFC3339)
body := `Branch: polecat/nux/gt-abc
Issue: gt-abc
Polecat: nux
Rig: gastown
Target: main
Merged-At: ` + ts + `
Merge-Commit: abc123`
payload := ParseMergedPayload(body)
if payload.Branch != "polecat/nux/gt-abc" {
t.Errorf("Branch = %q, want %q", payload.Branch, "polecat/nux/gt-abc")
}
if payload.MergeCommit != "abc123" {
t.Errorf("MergeCommit = %q, want %q", payload.MergeCommit, "abc123")
}
if payload.TargetBranch != "main" {
t.Errorf("TargetBranch = %q, want %q", payload.TargetBranch, "main")
}
}
func TestHandlerRegistry(t *testing.T) {
registry := NewHandlerRegistry()
handled := false
registry.Register(TypeMergeReady, func(msg *mail.Message) error {
handled = true
return nil
})
msg := &mail.Message{Subject: "MERGE_READY nux"}
if !registry.CanHandle(msg) {
t.Error("Registry should be able to handle MERGE_READY message")
}
if err := registry.Handle(msg); err != nil {
t.Errorf("Handle returned error: %v", err)
}
if !handled {
t.Error("Handler was not called")
}
// Test unregistered message type
unknownMsg := &mail.Message{Subject: "UNKNOWN message"}
if registry.CanHandle(unknownMsg) {
t.Error("Registry should not handle unknown message type")
}
}
func TestWrapWitnessHandlers(t *testing.T) {
handler := &mockWitnessHandler{}
registry := WrapWitnessHandlers(handler)
// Test MERGED
mergedMsg := &mail.Message{
Subject: "MERGED nux",
Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown\nTarget: main",
}
if err := registry.Handle(mergedMsg); err != nil {
t.Errorf("HandleMerged error: %v", err)
}
if !handler.mergedCalled {
t.Error("HandleMerged was not called")
}
// Test MERGE_FAILED
failedMsg := &mail.Message{
Subject: "MERGE_FAILED nux",
Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown\nTarget: main\nFailure-Type: tests\nError: failed",
}
if err := registry.Handle(failedMsg); err != nil {
t.Errorf("HandleMergeFailed error: %v", err)
}
if !handler.failedCalled {
t.Error("HandleMergeFailed was not called")
}
// Test REWORK_REQUEST
reworkMsg := &mail.Message{
Subject: "REWORK_REQUEST nux",
Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown\nTarget: main",
}
if err := registry.Handle(reworkMsg); err != nil {
t.Errorf("HandleReworkRequest error: %v", err)
}
if !handler.reworkCalled {
t.Error("HandleReworkRequest was not called")
}
}
func TestWrapRefineryHandlers(t *testing.T) {
handler := &mockRefineryHandler{}
registry := WrapRefineryHandlers(handler)
msg := &mail.Message{
Subject: "MERGE_READY nux",
Body: "Branch: polecat/nux\nIssue: gt-abc\nPolecat: nux\nRig: gastown",
}
if err := registry.Handle(msg); err != nil {
t.Errorf("HandleMergeReady error: %v", err)
}
if !handler.readyCalled {
t.Error("HandleMergeReady was not called")
}
}
func TestDefaultWitnessHandler(t *testing.T) {
tmpDir := t.TempDir()
handler := NewWitnessHandler("gastown", tmpDir)
// Capture output
var buf bytes.Buffer
handler.SetOutput(&buf)
// Test HandleMerged
mergedPayload := &MergedPayload{
Branch: "polecat/nux/gt-abc",
Issue: "gt-abc",
Polecat: "nux",
Rig: "gastown",
TargetBranch: "main",
MergeCommit: "abc123",
}
if err := handler.HandleMerged(mergedPayload); err != nil {
t.Errorf("HandleMerged error: %v", err)
}
if !strings.Contains(buf.String(), "MERGED received") {
t.Errorf("Output missing expected text: %s", buf.String())
}
// Test HandleMergeFailed
buf.Reset()
failedPayload := &MergeFailedPayload{
Branch: "polecat/nux/gt-abc",
Issue: "gt-abc",
Polecat: "nux",
Rig: "gastown",
TargetBranch: "main",
FailureType: "tests",
Error: "Test failed",
}
if err := handler.HandleMergeFailed(failedPayload); err != nil {
t.Errorf("HandleMergeFailed error: %v", err)
}
if !strings.Contains(buf.String(), "MERGE_FAILED received") {
t.Errorf("Output missing expected text: %s", buf.String())
}
// Test HandleReworkRequest
buf.Reset()
reworkPayload := &ReworkRequestPayload{
Branch: "polecat/nux/gt-abc",
Issue: "gt-abc",
Polecat: "nux",
Rig: "gastown",
TargetBranch: "main",
ConflictFiles: []string{"file1.go"},
}
if err := handler.HandleReworkRequest(reworkPayload); err != nil {
t.Errorf("HandleReworkRequest error: %v", err)
}
if !strings.Contains(buf.String(), "REWORK_REQUEST received") {
t.Errorf("Output missing expected text: %s", buf.String())
}
}
// Mock handlers for testing
type mockWitnessHandler struct {
mergedCalled bool
failedCalled bool
reworkCalled bool
}
func (m *mockWitnessHandler) HandleMerged(payload *MergedPayload) error {
m.mergedCalled = true
return nil
}
func (m *mockWitnessHandler) HandleMergeFailed(payload *MergeFailedPayload) error {
m.failedCalled = true
return nil
}
func (m *mockWitnessHandler) HandleReworkRequest(payload *ReworkRequestPayload) error {
m.reworkCalled = true
return nil
}
type mockRefineryHandler struct {
readyCalled bool
}
func (m *mockRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) error {
m.readyCalled = true
return nil
}

View File

@@ -0,0 +1,147 @@
package protocol
import (
"fmt"
"io"
"os"
"time"
"github.com/steveyegge/gastown/internal/mail"
"github.com/steveyegge/gastown/internal/mrqueue"
)
// DefaultRefineryHandler provides the default implementation for Refinery protocol handlers.
// It receives MERGE_READY messages from the Witness and adds work to the merge queue.
type DefaultRefineryHandler struct {
// Rig is the name of the rig this refinery processes.
Rig string
// WorkDir is the working directory for operations.
WorkDir string
// Queue is the merge request queue.
Queue *mrqueue.Queue
// Router is used to send mail messages.
Router *mail.Router
// Output is where to write status messages.
Output io.Writer
}
// NewRefineryHandler creates a new DefaultRefineryHandler.
func NewRefineryHandler(rig, workDir string) *DefaultRefineryHandler {
return &DefaultRefineryHandler{
Rig: rig,
WorkDir: workDir,
Queue: mrqueue.New(workDir),
Router: mail.NewRouter(workDir),
Output: os.Stdout,
}
}
// SetOutput sets the output writer for status messages.
func (h *DefaultRefineryHandler) SetOutput(w io.Writer) {
h.Output = w
}
// HandleMergeReady handles a MERGE_READY message from Witness.
// When a polecat's work is verified and ready, the Refinery:
// 1. Validates the merge request
// 2. Adds it to the merge queue
// 3. Acknowledges receipt
func (h *DefaultRefineryHandler) HandleMergeReady(payload *MergeReadyPayload) error {
fmt.Fprintf(h.Output, "[Refinery] MERGE_READY received for polecat %s\n", payload.Polecat)
fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch)
fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue)
fmt.Fprintf(h.Output, " Verified: %s\n", payload.Verified)
// Validate required fields
if payload.Branch == "" {
return fmt.Errorf("missing branch in MERGE_READY payload")
}
if payload.Polecat == "" {
return fmt.Errorf("missing polecat in MERGE_READY payload")
}
// Create merge request (ID is generated by Submit if empty)
mr := &mrqueue.MR{
Branch: payload.Branch,
Worker: payload.Polecat,
SourceIssue: payload.Issue,
Target: "main", // Default target, could be passed in payload
Rig: payload.Rig,
Title: fmt.Sprintf("Merge %s work on %s", payload.Polecat, payload.Issue),
CreatedAt: time.Now(),
}
// Add to queue
if err := h.Queue.Submit(mr); err != nil {
fmt.Fprintf(h.Output, "[Refinery] Error adding to queue: %v\n", err)
return fmt.Errorf("failed to add merge request to queue: %w", err)
}
fmt.Fprintf(h.Output, "[Refinery] ✓ Added to merge queue: %s\n", mr.ID)
fmt.Fprintf(h.Output, " Queue length: %d\n", h.Queue.Count())
return nil
}
// SendMerged sends a MERGED message to the Witness.
// Called by the Refinery after successfully merging a branch.
func (h *DefaultRefineryHandler) SendMerged(polecat, branch, issue, targetBranch, mergeCommit string) error {
msg := NewMergedMessage(h.Rig, polecat, branch, issue, targetBranch, mergeCommit)
return h.Router.Send(msg)
}
// SendMergeFailed sends a MERGE_FAILED message to the Witness.
// Called by the Refinery when a merge fails.
func (h *DefaultRefineryHandler) SendMergeFailed(polecat, branch, issue, targetBranch, failureType, errorMsg string) error {
msg := NewMergeFailedMessage(h.Rig, polecat, branch, issue, targetBranch, failureType, errorMsg)
return h.Router.Send(msg)
}
// SendReworkRequest sends a REWORK_REQUEST message to the Witness.
// Called by the Refinery when a branch has conflicts.
func (h *DefaultRefineryHandler) SendReworkRequest(polecat, branch, issue, targetBranch string, conflictFiles []string) error {
msg := NewReworkRequestMessage(h.Rig, polecat, branch, issue, targetBranch, conflictFiles)
return h.Router.Send(msg)
}
// NotifyMergeOutcome is a convenience method that sends the appropriate message
// based on the merge result.
type MergeOutcome struct {
// Success indicates whether the merge was successful.
Success bool
// Conflict indicates the failure was due to conflicts (needs rebase).
Conflict bool
// FailureType categorizes the failure (e.g., "tests", "build").
FailureType string
// Error is the error message if the merge failed.
Error string
// MergeCommit is the SHA of the merge commit on success.
MergeCommit string
// ConflictFiles lists files with conflicts (if Conflict is true).
ConflictFiles []string
}
// NotifyMergeOutcome sends the appropriate protocol message based on the outcome.
func (h *DefaultRefineryHandler) NotifyMergeOutcome(polecat, branch, issue, targetBranch string, outcome MergeOutcome) error {
if outcome.Success {
return h.SendMerged(polecat, branch, issue, targetBranch, outcome.MergeCommit)
}
if outcome.Conflict {
return h.SendReworkRequest(polecat, branch, issue, targetBranch, outcome.ConflictFiles)
}
return h.SendMergeFailed(polecat, branch, issue, targetBranch, outcome.FailureType, outcome.Error)
}
// Ensure DefaultRefineryHandler implements RefineryHandler.
var _ RefineryHandler = (*DefaultRefineryHandler)(nil)

182
internal/protocol/types.go Normal file
View File

@@ -0,0 +1,182 @@
// Package protocol provides inter-agent protocol message handling.
//
// This package defines protocol message types for Witness-Refinery communication
// and provides handlers for processing these messages.
//
// Protocol Message Types:
// - MERGE_READY: Witness → Refinery (branch ready for merge)
// - MERGED: Refinery → Witness (merge succeeded, cleanup ok)
// - MERGE_FAILED: Refinery → Witness (merge failed, needs rework)
// - REWORK_REQUEST: Refinery → Witness (rebase needed)
package protocol
import (
"strings"
"time"
)
// MessageType identifies the protocol message type.
type MessageType string
const (
// TypeMergeReady is sent from Witness to Refinery when a polecat's work
// is verified and ready for merge queue processing.
// Subject format: "MERGE_READY <polecat-name>"
TypeMergeReady MessageType = "MERGE_READY"
// TypeMerged is sent from Refinery to Witness when a branch has been
// successfully merged to the target branch.
// Subject format: "MERGED <polecat-name>"
TypeMerged MessageType = "MERGED"
// TypeMergeFailed is sent from Refinery to Witness when a merge attempt
// failed (tests, build, or other non-conflict error).
// Subject format: "MERGE_FAILED <polecat-name>"
TypeMergeFailed MessageType = "MERGE_FAILED"
// TypeReworkRequest is sent from Refinery to Witness when a polecat's
// branch needs rebasing due to conflicts with the target branch.
// Subject format: "REWORK_REQUEST <polecat-name>"
TypeReworkRequest MessageType = "REWORK_REQUEST"
)
// ParseMessageType extracts the protocol message type from a mail subject.
// Returns empty string if subject doesn't match a known protocol type.
func ParseMessageType(subject string) MessageType {
subject = strings.TrimSpace(subject)
// Check each known prefix
prefixes := []MessageType{
TypeMergeReady,
TypeMerged,
TypeMergeFailed,
TypeReworkRequest,
}
for _, prefix := range prefixes {
if strings.HasPrefix(subject, string(prefix)) {
return prefix
}
}
return ""
}
// MergeReadyPayload contains the data for a MERGE_READY message.
// Sent by Witness after verifying polecat work is complete.
type MergeReadyPayload struct {
// Branch is the polecat's work branch (e.g., "polecat/Toast/gt-abc").
Branch string `json:"branch"`
// Issue is the beads issue ID the polecat completed.
Issue string `json:"issue"`
// Polecat is the worker name.
Polecat string `json:"polecat"`
// Rig is the rig name containing the polecat.
Rig string `json:"rig"`
// Verified contains verification notes.
Verified string `json:"verified,omitempty"`
// Timestamp is when the message was created.
Timestamp time.Time `json:"timestamp"`
}
// MergedPayload contains the data for a MERGED message.
// Sent by Refinery after successful merge to target branch.
type MergedPayload struct {
// Branch is the source branch that was merged.
Branch string `json:"branch"`
// Issue is the beads issue ID.
Issue string `json:"issue"`
// Polecat is the worker name.
Polecat string `json:"polecat"`
// Rig is the rig name.
Rig string `json:"rig"`
// MergedAt is when the merge completed.
MergedAt time.Time `json:"merged_at"`
// MergeCommit is the SHA of the merge commit.
MergeCommit string `json:"merge_commit,omitempty"`
// TargetBranch is the branch merged into (e.g., "main").
TargetBranch string `json:"target_branch"`
}
// MergeFailedPayload contains the data for a MERGE_FAILED message.
// Sent by Refinery when merge fails due to tests, build, or other errors.
type MergeFailedPayload struct {
// Branch is the source branch that failed to merge.
Branch string `json:"branch"`
// Issue is the beads issue ID.
Issue string `json:"issue"`
// Polecat is the worker name.
Polecat string `json:"polecat"`
// Rig is the rig name.
Rig string `json:"rig"`
// FailedAt is when the failure occurred.
FailedAt time.Time `json:"failed_at"`
// FailureType categorizes the failure (tests, build, push, etc.).
FailureType string `json:"failure_type"`
// Error is the error message.
Error string `json:"error"`
// TargetBranch is the branch we tried to merge into.
TargetBranch string `json:"target_branch"`
}
// ReworkRequestPayload contains the data for a REWORK_REQUEST message.
// Sent by Refinery when a polecat's branch has conflicts requiring rebase.
type ReworkRequestPayload struct {
// Branch is the source branch that needs rebasing.
Branch string `json:"branch"`
// Issue is the beads issue ID.
Issue string `json:"issue"`
// Polecat is the worker name.
Polecat string `json:"polecat"`
// Rig is the rig name.
Rig string `json:"rig"`
// RequestedAt is when the rework was requested.
RequestedAt time.Time `json:"requested_at"`
// TargetBranch is the branch to rebase onto.
TargetBranch string `json:"target_branch"`
// ConflictFiles lists files with conflicts (if known).
ConflictFiles []string `json:"conflict_files,omitempty"`
// Instructions provides specific rebase instructions.
Instructions string `json:"instructions,omitempty"`
}
// IsProtocolMessage returns true if the subject matches a known protocol type.
func IsProtocolMessage(subject string) bool {
return ParseMessageType(subject) != ""
}
// ExtractPolecat extracts the polecat name from a protocol message subject.
// Subject format: "TYPE <polecat-name>"
func ExtractPolecat(subject string) string {
subject = strings.TrimSpace(subject)
parts := strings.SplitN(subject, " ", 2)
if len(parts) < 2 {
return ""
}
return strings.TrimSpace(parts[1])
}

View File

@@ -0,0 +1,209 @@
package protocol
import (
"fmt"
"io"
"os"
"github.com/steveyegge/gastown/internal/mail"
)
// DefaultWitnessHandler provides the default implementation for Witness protocol handlers.
// It receives messages from the Refinery about merge outcomes and takes appropriate action.
type DefaultWitnessHandler struct {
// Rig is the name of the rig this witness manages.
Rig string
// WorkDir is the working directory for operations.
WorkDir string
// Router is used to send mail messages.
Router *mail.Router
// Output is where to write status messages.
Output io.Writer
}
// NewWitnessHandler creates a new DefaultWitnessHandler.
func NewWitnessHandler(rig, workDir string) *DefaultWitnessHandler {
return &DefaultWitnessHandler{
Rig: rig,
WorkDir: workDir,
Router: mail.NewRouter(workDir),
Output: os.Stdout,
}
}
// SetOutput sets the output writer for status messages.
func (h *DefaultWitnessHandler) SetOutput(w io.Writer) {
h.Output = w
}
// HandleMerged handles a MERGED message from Refinery.
// When a branch is successfully merged, the Witness:
// 1. Logs the success
// 2. Notifies the polecat of successful merge
// 3. Initiates polecat cleanup (nuke worktree)
func (h *DefaultWitnessHandler) HandleMerged(payload *MergedPayload) error {
fmt.Fprintf(h.Output, "[Witness] MERGED received for polecat %s\n", payload.Polecat)
fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch)
fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue)
fmt.Fprintf(h.Output, " Merged to: %s\n", payload.TargetBranch)
if payload.MergeCommit != "" {
fmt.Fprintf(h.Output, " Commit: %s\n", payload.MergeCommit)
}
// Notify the polecat about successful merge
if err := h.notifyPolecatMerged(payload); err != nil {
fmt.Fprintf(h.Output, "[Witness] Warning: failed to notify polecat: %v\n", err)
// Continue - notification is best-effort
}
// Initiate polecat cleanup
// Note: Actual cleanup is done by a separate process/molecule
// This handler just records that cleanup is needed
fmt.Fprintf(h.Output, "[Witness] ✓ Polecat %s work merged, cleanup can proceed\n", payload.Polecat)
return nil
}
// HandleMergeFailed handles a MERGE_FAILED message from Refinery.
// When a merge fails (tests, build, etc.), the Witness:
// 1. Logs the failure
// 2. Notifies the polecat about the failure and required fixes
// 3. Updates the polecat's state to indicate rework needed
func (h *DefaultWitnessHandler) HandleMergeFailed(payload *MergeFailedPayload) error {
fmt.Fprintf(h.Output, "[Witness] MERGE_FAILED received for polecat %s\n", payload.Polecat)
fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch)
fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue)
fmt.Fprintf(h.Output, " Failure type: %s\n", payload.FailureType)
fmt.Fprintf(h.Output, " Error: %s\n", payload.Error)
// Notify the polecat about the failure
if err := h.notifyPolecatFailed(payload); err != nil {
fmt.Fprintf(h.Output, "[Witness] Warning: failed to notify polecat: %v\n", err)
// Continue - notification is best-effort
}
fmt.Fprintf(h.Output, "[Witness] ✗ Polecat %s merge failed, rework needed\n", payload.Polecat)
return nil
}
// HandleReworkRequest handles a REWORK_REQUEST message from Refinery.
// When a branch has conflicts requiring rebase, the Witness:
// 1. Logs the conflict
// 2. Notifies the polecat with rebase instructions
// 3. Updates the polecat's state to indicate rebase needed
func (h *DefaultWitnessHandler) HandleReworkRequest(payload *ReworkRequestPayload) error {
fmt.Fprintf(h.Output, "[Witness] REWORK_REQUEST received for polecat %s\n", payload.Polecat)
fmt.Fprintf(h.Output, " Branch: %s\n", payload.Branch)
fmt.Fprintf(h.Output, " Issue: %s\n", payload.Issue)
fmt.Fprintf(h.Output, " Target: %s\n", payload.TargetBranch)
if len(payload.ConflictFiles) > 0 {
fmt.Fprintf(h.Output, " Conflicts in: %v\n", payload.ConflictFiles)
}
// Notify the polecat about the rebase requirement
if err := h.notifyPolecatRebase(payload); err != nil {
fmt.Fprintf(h.Output, "[Witness] Warning: failed to notify polecat: %v\n", err)
// Continue - notification is best-effort
}
fmt.Fprintf(h.Output, "[Witness] ⚠ Polecat %s needs to rebase onto %s\n", payload.Polecat, payload.TargetBranch)
return nil
}
// notifyPolecatMerged sends a merge success notification to a polecat.
func (h *DefaultWitnessHandler) notifyPolecatMerged(payload *MergedPayload) error {
msg := mail.NewMessage(
fmt.Sprintf("%s/witness", h.Rig),
fmt.Sprintf("%s/%s", h.Rig, payload.Polecat),
"Work merged successfully",
fmt.Sprintf(`Your work has been merged to %s.
Branch: %s
Issue: %s
Commit: %s
Thank you for your contribution! Your worktree will be cleaned up shortly.`,
payload.TargetBranch,
payload.Branch,
payload.Issue,
payload.MergeCommit,
),
)
msg.Priority = mail.PriorityNormal
return h.Router.Send(msg)
}
// notifyPolecatFailed sends a merge failure notification to a polecat.
func (h *DefaultWitnessHandler) notifyPolecatFailed(payload *MergeFailedPayload) error {
msg := mail.NewMessage(
fmt.Sprintf("%s/witness", h.Rig),
fmt.Sprintf("%s/%s", h.Rig, payload.Polecat),
fmt.Sprintf("Merge failed: %s", payload.FailureType),
fmt.Sprintf(`Your merge request failed.
Branch: %s
Issue: %s
Failure: %s
Error: %s
Please fix the issue and resubmit your work with 'gt done'.`,
payload.Branch,
payload.Issue,
payload.FailureType,
payload.Error,
),
)
msg.Priority = mail.PriorityHigh
msg.Type = mail.TypeTask
return h.Router.Send(msg)
}
// notifyPolecatRebase sends a rebase request notification to a polecat.
func (h *DefaultWitnessHandler) notifyPolecatRebase(payload *ReworkRequestPayload) error {
conflictInfo := ""
if len(payload.ConflictFiles) > 0 {
conflictInfo = fmt.Sprintf("\nConflicting files:\n")
for _, f := range payload.ConflictFiles {
conflictInfo += fmt.Sprintf(" - %s\n", f)
}
}
msg := mail.NewMessage(
fmt.Sprintf("%s/witness", h.Rig),
fmt.Sprintf("%s/%s", h.Rig, payload.Polecat),
"Rebase required - merge conflict",
fmt.Sprintf(`Your branch has conflicts with %s.
Branch: %s
Issue: %s
%s
Please rebase your changes:
git fetch origin
git rebase origin/%s
# Resolve any conflicts
git push -f
Then run 'gt done' to resubmit for merge.`,
payload.TargetBranch,
payload.Branch,
payload.Issue,
conflictInfo,
payload.TargetBranch,
),
)
msg.Priority = mail.PriorityHigh
msg.Type = mail.TypeTask
return h.Router.Send(msg)
}
// Ensure DefaultWitnessHandler implements WitnessHandler.
var _ WitnessHandler = (*DefaultWitnessHandler)(nil)