feat(mail): extend message bead for queues/channels

Add queue/channel routing fields to message beads:
- queue: string (queue name, mutually exclusive with to/channel)
- channel: string (channel name, mutually exclusive with to/queue)
- claimed_by: string (who claimed queue message)
- claimed_at: timestamp (when claimed)

Messages can now be direct (To), queued (Queue), or broadcast (Channel).
Added constructors NewQueueMessage/NewChannelMessage, type helpers
IsQueueMessage/IsChannelMessage/IsDirectMessage/IsClaimed, and
Validate() for mutual exclusivity checks.

Also fixes build error in mail_queue.go (QueueConfig struct nil comparison).

Closes gt-xfqh1e.4

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gastown/crew/jack
2026-01-14 21:14:36 -08:00
committed by Steve Yegge
parent a244c3d498
commit 8eafcc8a16
3 changed files with 530 additions and 8 deletions

View File

@@ -35,7 +35,7 @@ func runMailClaim(cmd *cobra.Command, args []string) error {
}
queueCfg, ok := cfg.Queues[queueName]
if !ok || queueCfg == nil {
if !ok {
return fmt.Errorf("unknown queue: %s", queueName)
}

View File

@@ -107,6 +107,22 @@ type Message struct {
// CC contains addresses that should receive a copy of this message.
// CC'd recipients see the message in their inbox but are not the primary recipient.
CC []string `json:"cc,omitempty"`
// Queue is the queue name for queue-routed messages.
// Mutually exclusive with To and Channel - a message is either direct, queued, or broadcast.
Queue string `json:"queue,omitempty"`
// Channel is the channel name for broadcast messages.
// Mutually exclusive with To and Queue - a message is either direct, queued, or broadcast.
Channel string `json:"channel,omitempty"`
// ClaimedBy is the agent that claimed this queue message.
// Only set for queue messages after claiming.
ClaimedBy string `json:"claimed_by,omitempty"`
// ClaimedAt is when the queue message was claimed.
// Only set for queue messages after claiming.
ClaimedAt *time.Time `json:"claimed_at,omitempty"`
}
// NewMessage creates a new message with a generated ID and thread ID.
@@ -142,6 +158,92 @@ func NewReplyMessage(from, to, subject, body string, original *Message) *Message
}
}
// NewQueueMessage creates a message destined for a queue.
// Queue messages have no direct recipient - they are claimed by eligible agents.
func NewQueueMessage(from, queue, subject, body string) *Message {
return &Message{
ID: generateID(),
From: from,
Queue: queue,
Subject: subject,
Body: body,
Timestamp: time.Now(),
Read: false,
Priority: PriorityNormal,
Type: TypeTask, // Queue messages are typically tasks
ThreadID: generateThreadID(),
}
}
// NewChannelMessage creates a broadcast message for a channel.
// Channel messages are visible to all readers of the channel.
func NewChannelMessage(from, channel, subject, body string) *Message {
return &Message{
ID: generateID(),
From: from,
Channel: channel,
Subject: subject,
Body: body,
Timestamp: time.Now(),
Read: false,
Priority: PriorityNormal,
Type: TypeNotification,
ThreadID: generateThreadID(),
}
}
// IsQueueMessage returns true if this is a queue-routed message.
func (m *Message) IsQueueMessage() bool {
return m.Queue != ""
}
// IsChannelMessage returns true if this is a channel broadcast message.
func (m *Message) IsChannelMessage() bool {
return m.Channel != ""
}
// IsDirectMessage returns true if this is a direct (To-addressed) message.
func (m *Message) IsDirectMessage() bool {
return m.Queue == "" && m.Channel == "" && m.To != ""
}
// IsClaimed returns true if this queue message has been claimed.
func (m *Message) IsClaimed() bool {
return m.ClaimedBy != ""
}
// Validate checks that the message has a valid routing configuration.
// Returns an error if to, queue, and channel are not mutually exclusive.
func (m *Message) Validate() error {
count := 0
if m.To != "" {
count++
}
if m.Queue != "" {
count++
}
if m.Channel != "" {
count++
}
if count == 0 {
return fmt.Errorf("message must have exactly one of: to, queue, or channel")
}
if count > 1 {
return fmt.Errorf("message cannot have multiple routing targets (to, queue, channel are mutually exclusive)")
}
// ClaimedBy/ClaimedAt only valid for queue messages
if m.ClaimedBy != "" && m.Queue == "" {
return fmt.Errorf("claimed_by is only valid for queue messages")
}
if m.ClaimedAt != nil && m.Queue == "" {
return fmt.Errorf("claimed_at is only valid for queue messages")
}
return nil
}
// generateID creates a random message ID.
// Falls back to time-based ID if crypto/rand fails (extremely rare).
func generateID() string {
@@ -170,20 +272,24 @@ type BeadsMessage struct {
ID string `json:"id"`
Title string `json:"title"` // Subject
Description string `json:"description"` // Body
Assignee string `json:"assignee"` // To identity
Assignee string `json:"assignee"` // To identity (for direct messages)
Priority int `json:"priority"` // 0=urgent, 1=high, 2=normal, 3=low
Status string `json:"status"` // open=unread, closed=read
CreatedAt time.Time `json:"created_at"`
Labels []string `json:"labels"` // Metadata labels (from:X, thread:X, reply-to:X, msg-type:X, cc:X)
Labels []string `json:"labels"` // Metadata labels (from:X, thread:X, reply-to:X, msg-type:X, cc:X, queue:X, channel:X, claimed-by:X, claimed-at:X)
Pinned bool `json:"pinned,omitempty"`
Wisp bool `json:"wisp,omitempty"` // Ephemeral message (filtered from JSONL export)
// Cached parsed values (populated by ParseLabels)
sender string
threadID string
replyTo string
msgType string
cc []string // CC recipients
sender string
threadID string
replyTo string
msgType string
cc []string // CC recipients
queue string // Queue name (for queue messages)
channel string // Channel name (for broadcast messages)
claimedBy string // Who claimed the queue message
claimedAt *time.Time // When the queue message was claimed
}
// ParseLabels extracts metadata from the labels array.
@@ -199,6 +305,17 @@ func (bm *BeadsMessage) ParseLabels() {
bm.msgType = strings.TrimPrefix(label, "msg-type:")
} else if strings.HasPrefix(label, "cc:") {
bm.cc = append(bm.cc, strings.TrimPrefix(label, "cc:"))
} else if strings.HasPrefix(label, "queue:") {
bm.queue = strings.TrimPrefix(label, "queue:")
} else if strings.HasPrefix(label, "channel:") {
bm.channel = strings.TrimPrefix(label, "channel:")
} else if strings.HasPrefix(label, "claimed-by:") {
bm.claimedBy = strings.TrimPrefix(label, "claimed-by:")
} else if strings.HasPrefix(label, "claimed-at:") {
ts := strings.TrimPrefix(label, "claimed-at:")
if t, err := time.Parse(time.RFC3339, ts); err == nil {
bm.claimedAt = &t
}
}
}
}
@@ -263,9 +380,51 @@ func (bm *BeadsMessage) ToMessage() *Message {
ReplyTo: bm.replyTo,
Wisp: bm.Wisp,
CC: ccAddrs,
Queue: bm.queue,
Channel: bm.channel,
ClaimedBy: bm.claimedBy,
ClaimedAt: bm.claimedAt,
}
}
// GetQueue returns the queue name for queue messages.
func (bm *BeadsMessage) GetQueue() string {
return bm.queue
}
// GetChannel returns the channel name for broadcast messages.
func (bm *BeadsMessage) GetChannel() string {
return bm.channel
}
// GetClaimedBy returns who claimed the queue message.
func (bm *BeadsMessage) GetClaimedBy() string {
return bm.claimedBy
}
// GetClaimedAt returns when the queue message was claimed.
func (bm *BeadsMessage) GetClaimedAt() *time.Time {
return bm.claimedAt
}
// IsQueueMessage returns true if this is a queue-routed message.
func (bm *BeadsMessage) IsQueueMessage() bool {
bm.ParseLabels()
return bm.queue != ""
}
// IsChannelMessage returns true if this is a channel broadcast message.
func (bm *BeadsMessage) IsChannelMessage() bool {
bm.ParseLabels()
return bm.channel != ""
}
// IsDirectMessage returns true if this is a direct (To-addressed) message.
func (bm *BeadsMessage) IsDirectMessage() bool {
bm.ParseLabels()
return bm.queue == "" && bm.channel == "" && bm.Assignee != ""
}
// HasLabel checks if the message has a specific label.
func (bm *BeadsMessage) HasLabel(label string) bool {
for _, l := range bm.Labels {

View File

@@ -349,3 +349,366 @@ func TestBeadsMessageToMessageEmptyLabels(t *testing.T) {
t.Errorf("ThreadID should be empty, got %q", msg.ThreadID)
}
}
func TestNewQueueMessage(t *testing.T) {
msg := NewQueueMessage("mayor/", "work-requests", "New Task", "Please process this")
if msg.From != "mayor/" {
t.Errorf("From = %q, want 'mayor/'", msg.From)
}
if msg.Queue != "work-requests" {
t.Errorf("Queue = %q, want 'work-requests'", msg.Queue)
}
if msg.To != "" {
t.Errorf("To should be empty for queue messages, got %q", msg.To)
}
if msg.Channel != "" {
t.Errorf("Channel should be empty for queue messages, got %q", msg.Channel)
}
if msg.Type != TypeTask {
t.Errorf("Type = %q, want TypeTask", msg.Type)
}
if msg.ID == "" {
t.Error("ID should be generated")
}
if msg.ThreadID == "" {
t.Error("ThreadID should be generated")
}
}
func TestNewChannelMessage(t *testing.T) {
msg := NewChannelMessage("deacon/", "alerts", "System Alert", "System is healthy")
if msg.From != "deacon/" {
t.Errorf("From = %q, want 'deacon/'", msg.From)
}
if msg.Channel != "alerts" {
t.Errorf("Channel = %q, want 'alerts'", msg.Channel)
}
if msg.To != "" {
t.Errorf("To should be empty for channel messages, got %q", msg.To)
}
if msg.Queue != "" {
t.Errorf("Queue should be empty for channel messages, got %q", msg.Queue)
}
if msg.Type != TypeNotification {
t.Errorf("Type = %q, want TypeNotification", msg.Type)
}
}
func TestMessageIsQueueMessage(t *testing.T) {
directMsg := NewMessage("mayor/", "gastown/Toast", "Test", "Body")
queueMsg := NewQueueMessage("mayor/", "work-requests", "Task", "Body")
channelMsg := NewChannelMessage("deacon/", "alerts", "Alert", "Body")
if directMsg.IsQueueMessage() {
t.Error("Direct message should not be a queue message")
}
if !queueMsg.IsQueueMessage() {
t.Error("Queue message should be a queue message")
}
if channelMsg.IsQueueMessage() {
t.Error("Channel message should not be a queue message")
}
}
func TestMessageIsChannelMessage(t *testing.T) {
directMsg := NewMessage("mayor/", "gastown/Toast", "Test", "Body")
queueMsg := NewQueueMessage("mayor/", "work-requests", "Task", "Body")
channelMsg := NewChannelMessage("deacon/", "alerts", "Alert", "Body")
if directMsg.IsChannelMessage() {
t.Error("Direct message should not be a channel message")
}
if queueMsg.IsChannelMessage() {
t.Error("Queue message should not be a channel message")
}
if !channelMsg.IsChannelMessage() {
t.Error("Channel message should be a channel message")
}
}
func TestMessageIsDirectMessage(t *testing.T) {
directMsg := NewMessage("mayor/", "gastown/Toast", "Test", "Body")
queueMsg := NewQueueMessage("mayor/", "work-requests", "Task", "Body")
channelMsg := NewChannelMessage("deacon/", "alerts", "Alert", "Body")
if !directMsg.IsDirectMessage() {
t.Error("Direct message should be a direct message")
}
if queueMsg.IsDirectMessage() {
t.Error("Queue message should not be a direct message")
}
if channelMsg.IsDirectMessage() {
t.Error("Channel message should not be a direct message")
}
}
func TestMessageValidate(t *testing.T) {
tests := []struct {
name string
msg *Message
wantErr bool
errMsg string
}{
{
name: "valid direct message",
msg: NewMessage("mayor/", "gastown/Toast", "Test", "Body"),
wantErr: false,
},
{
name: "valid queue message",
msg: NewQueueMessage("mayor/", "work-requests", "Task", "Body"),
wantErr: false,
},
{
name: "valid channel message",
msg: NewChannelMessage("deacon/", "alerts", "Alert", "Body"),
wantErr: false,
},
{
name: "no routing target",
msg: &Message{
ID: "msg-001",
From: "mayor/",
Subject: "Test",
},
wantErr: true,
errMsg: "must have exactly one of",
},
{
name: "both to and queue",
msg: &Message{
ID: "msg-001",
From: "mayor/",
To: "gastown/Toast",
Queue: "work-requests",
Subject: "Test",
},
wantErr: true,
errMsg: "mutually exclusive",
},
{
name: "both to and channel",
msg: &Message{
ID: "msg-001",
From: "mayor/",
To: "gastown/Toast",
Channel: "alerts",
Subject: "Test",
},
wantErr: true,
errMsg: "mutually exclusive",
},
{
name: "both queue and channel",
msg: &Message{
ID: "msg-001",
From: "mayor/",
Queue: "work-requests",
Channel: "alerts",
Subject: "Test",
},
wantErr: true,
errMsg: "mutually exclusive",
},
{
name: "claimed_by on non-queue message",
msg: &Message{
ID: "msg-001",
From: "mayor/",
To: "gastown/Toast",
Subject: "Test",
ClaimedBy: "gastown/nux",
},
wantErr: true,
errMsg: "claimed_by is only valid for queue messages",
},
{
name: "claimed_by on queue message is valid",
msg: &Message{
ID: "msg-001",
From: "mayor/",
Queue: "work-requests",
Subject: "Test",
ClaimedBy: "gastown/nux",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.msg.Validate()
if tt.wantErr {
if err == nil {
t.Error("expected error but got nil")
} else if tt.errMsg != "" && !containsString(err.Error(), tt.errMsg) {
t.Errorf("error %q should contain %q", err.Error(), tt.errMsg)
}
} else {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
})
}
}
func containsString(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(substr) == 0 ||
(len(s) > 0 && len(substr) > 0 && findSubstring(s, substr)))
}
func findSubstring(s, substr string) bool {
for i := 0; i <= len(s)-len(substr); i++ {
if s[i:i+len(substr)] == substr {
return true
}
}
return false
}
func TestBeadsMessageParseQueueChannelLabels(t *testing.T) {
claimedTime := time.Date(2026, 1, 14, 12, 0, 0, 0, time.UTC)
claimedAtStr := claimedTime.Format(time.RFC3339)
bm := BeadsMessage{
ID: "hq-queue",
Title: "Queue Message",
Description: "Test queue message",
Status: "open",
Labels: []string{
"from:mayor/",
"queue:work-requests",
"claimed-by:gastown/nux",
"claimed-at:" + claimedAtStr,
},
Priority: 2,
}
msg := bm.ToMessage()
if msg.Queue != "work-requests" {
t.Errorf("Queue = %q, want 'work-requests'", msg.Queue)
}
if msg.ClaimedBy != "gastown/nux" {
t.Errorf("ClaimedBy = %q, want 'gastown/nux'", msg.ClaimedBy)
}
if msg.ClaimedAt == nil {
t.Error("ClaimedAt should not be nil")
} else if !msg.ClaimedAt.Equal(claimedTime) {
t.Errorf("ClaimedAt = %v, want %v", msg.ClaimedAt, claimedTime)
}
}
func TestBeadsMessageParseChannelLabel(t *testing.T) {
bm := BeadsMessage{
ID: "hq-channel",
Title: "Channel Message",
Description: "Test channel message",
Status: "open",
Labels: []string{"from:deacon/", "channel:alerts"},
Priority: 2,
}
msg := bm.ToMessage()
if msg.Channel != "alerts" {
t.Errorf("Channel = %q, want 'alerts'", msg.Channel)
}
if msg.Queue != "" {
t.Errorf("Queue should be empty, got %q", msg.Queue)
}
}
func TestBeadsMessageIsQueueMessage(t *testing.T) {
queueMsg := BeadsMessage{
ID: "hq-queue",
Labels: []string{"queue:work-requests"},
}
directMsg := BeadsMessage{
ID: "hq-direct",
Assignee: "gastown/Toast",
}
channelMsg := BeadsMessage{
ID: "hq-channel",
Labels: []string{"channel:alerts"},
}
if !queueMsg.IsQueueMessage() {
t.Error("Queue message should be identified as queue message")
}
if directMsg.IsQueueMessage() {
t.Error("Direct message should not be identified as queue message")
}
if channelMsg.IsQueueMessage() {
t.Error("Channel message should not be identified as queue message")
}
}
func TestBeadsMessageIsChannelMessage(t *testing.T) {
queueMsg := BeadsMessage{
ID: "hq-queue",
Labels: []string{"queue:work-requests"},
}
directMsg := BeadsMessage{
ID: "hq-direct",
Assignee: "gastown/Toast",
}
channelMsg := BeadsMessage{
ID: "hq-channel",
Labels: []string{"channel:alerts"},
}
if queueMsg.IsChannelMessage() {
t.Error("Queue message should not be identified as channel message")
}
if directMsg.IsChannelMessage() {
t.Error("Direct message should not be identified as channel message")
}
if !channelMsg.IsChannelMessage() {
t.Error("Channel message should be identified as channel message")
}
}
func TestBeadsMessageIsDirectMessage(t *testing.T) {
queueMsg := BeadsMessage{
ID: "hq-queue",
Labels: []string{"queue:work-requests"},
}
directMsg := BeadsMessage{
ID: "hq-direct",
Assignee: "gastown/Toast",
}
channelMsg := BeadsMessage{
ID: "hq-channel",
Labels: []string{"channel:alerts"},
}
if queueMsg.IsDirectMessage() {
t.Error("Queue message should not be identified as direct message")
}
if !directMsg.IsDirectMessage() {
t.Error("Direct message should be identified as direct message")
}
if channelMsg.IsDirectMessage() {
t.Error("Channel message should not be identified as direct message")
}
}
func TestMessageIsClaimed(t *testing.T) {
unclaimed := NewQueueMessage("mayor/", "work-requests", "Task", "Body")
if unclaimed.IsClaimed() {
t.Error("Unclaimed message should not be claimed")
}
claimed := NewQueueMessage("mayor/", "work-requests", "Task", "Body")
claimed.ClaimedBy = "gastown/nux"
now := time.Now()
claimed.ClaimedAt = &now
if !claimed.IsClaimed() {
t.Error("Claimed message should be claimed")
}
}