From 8eafcc8a1697e9ca2992126d607019b11cd5cdbc Mon Sep 17 00:00:00 2001 From: gastown/crew/jack Date: Wed, 14 Jan 2026 21:14:36 -0800 Subject: [PATCH] feat(mail): extend message bead for queues/channels Add queue/channel routing fields to message beads: - queue: string (queue name, mutually exclusive with to/channel) - channel: string (channel name, mutually exclusive with to/queue) - claimed_by: string (who claimed queue message) - claimed_at: timestamp (when claimed) Messages can now be direct (To), queued (Queue), or broadcast (Channel). Added constructors NewQueueMessage/NewChannelMessage, type helpers IsQueueMessage/IsChannelMessage/IsDirectMessage/IsClaimed, and Validate() for mutual exclusivity checks. Also fixes build error in mail_queue.go (QueueConfig struct nil comparison). Closes gt-xfqh1e.4 Co-Authored-By: Claude Opus 4.5 --- internal/cmd/mail_queue.go | 2 +- internal/mail/types.go | 173 ++++++++++++++++- internal/mail/types_test.go | 363 ++++++++++++++++++++++++++++++++++++ 3 files changed, 530 insertions(+), 8 deletions(-) diff --git a/internal/cmd/mail_queue.go b/internal/cmd/mail_queue.go index 0ad4a141..887e689e 100644 --- a/internal/cmd/mail_queue.go +++ b/internal/cmd/mail_queue.go @@ -35,7 +35,7 @@ func runMailClaim(cmd *cobra.Command, args []string) error { } queueCfg, ok := cfg.Queues[queueName] - if !ok || queueCfg == nil { + if !ok { return fmt.Errorf("unknown queue: %s", queueName) } diff --git a/internal/mail/types.go b/internal/mail/types.go index 24b904e2..b2a2346d 100644 --- a/internal/mail/types.go +++ b/internal/mail/types.go @@ -107,6 +107,22 @@ type Message struct { // CC contains addresses that should receive a copy of this message. // CC'd recipients see the message in their inbox but are not the primary recipient. CC []string `json:"cc,omitempty"` + + // Queue is the queue name for queue-routed messages. + // Mutually exclusive with To and Channel - a message is either direct, queued, or broadcast. + Queue string `json:"queue,omitempty"` + + // Channel is the channel name for broadcast messages. + // Mutually exclusive with To and Queue - a message is either direct, queued, or broadcast. + Channel string `json:"channel,omitempty"` + + // ClaimedBy is the agent that claimed this queue message. + // Only set for queue messages after claiming. + ClaimedBy string `json:"claimed_by,omitempty"` + + // ClaimedAt is when the queue message was claimed. + // Only set for queue messages after claiming. + ClaimedAt *time.Time `json:"claimed_at,omitempty"` } // NewMessage creates a new message with a generated ID and thread ID. @@ -142,6 +158,92 @@ func NewReplyMessage(from, to, subject, body string, original *Message) *Message } } +// NewQueueMessage creates a message destined for a queue. +// Queue messages have no direct recipient - they are claimed by eligible agents. +func NewQueueMessage(from, queue, subject, body string) *Message { + return &Message{ + ID: generateID(), + From: from, + Queue: queue, + Subject: subject, + Body: body, + Timestamp: time.Now(), + Read: false, + Priority: PriorityNormal, + Type: TypeTask, // Queue messages are typically tasks + ThreadID: generateThreadID(), + } +} + +// NewChannelMessage creates a broadcast message for a channel. +// Channel messages are visible to all readers of the channel. +func NewChannelMessage(from, channel, subject, body string) *Message { + return &Message{ + ID: generateID(), + From: from, + Channel: channel, + Subject: subject, + Body: body, + Timestamp: time.Now(), + Read: false, + Priority: PriorityNormal, + Type: TypeNotification, + ThreadID: generateThreadID(), + } +} + +// IsQueueMessage returns true if this is a queue-routed message. +func (m *Message) IsQueueMessage() bool { + return m.Queue != "" +} + +// IsChannelMessage returns true if this is a channel broadcast message. +func (m *Message) IsChannelMessage() bool { + return m.Channel != "" +} + +// IsDirectMessage returns true if this is a direct (To-addressed) message. +func (m *Message) IsDirectMessage() bool { + return m.Queue == "" && m.Channel == "" && m.To != "" +} + +// IsClaimed returns true if this queue message has been claimed. +func (m *Message) IsClaimed() bool { + return m.ClaimedBy != "" +} + +// Validate checks that the message has a valid routing configuration. +// Returns an error if to, queue, and channel are not mutually exclusive. +func (m *Message) Validate() error { + count := 0 + if m.To != "" { + count++ + } + if m.Queue != "" { + count++ + } + if m.Channel != "" { + count++ + } + + if count == 0 { + return fmt.Errorf("message must have exactly one of: to, queue, or channel") + } + if count > 1 { + return fmt.Errorf("message cannot have multiple routing targets (to, queue, channel are mutually exclusive)") + } + + // ClaimedBy/ClaimedAt only valid for queue messages + if m.ClaimedBy != "" && m.Queue == "" { + return fmt.Errorf("claimed_by is only valid for queue messages") + } + if m.ClaimedAt != nil && m.Queue == "" { + return fmt.Errorf("claimed_at is only valid for queue messages") + } + + return nil +} + // generateID creates a random message ID. // Falls back to time-based ID if crypto/rand fails (extremely rare). func generateID() string { @@ -170,20 +272,24 @@ type BeadsMessage struct { ID string `json:"id"` Title string `json:"title"` // Subject Description string `json:"description"` // Body - Assignee string `json:"assignee"` // To identity + Assignee string `json:"assignee"` // To identity (for direct messages) Priority int `json:"priority"` // 0=urgent, 1=high, 2=normal, 3=low Status string `json:"status"` // open=unread, closed=read CreatedAt time.Time `json:"created_at"` - Labels []string `json:"labels"` // Metadata labels (from:X, thread:X, reply-to:X, msg-type:X, cc:X) + Labels []string `json:"labels"` // Metadata labels (from:X, thread:X, reply-to:X, msg-type:X, cc:X, queue:X, channel:X, claimed-by:X, claimed-at:X) Pinned bool `json:"pinned,omitempty"` Wisp bool `json:"wisp,omitempty"` // Ephemeral message (filtered from JSONL export) // Cached parsed values (populated by ParseLabels) - sender string - threadID string - replyTo string - msgType string - cc []string // CC recipients + sender string + threadID string + replyTo string + msgType string + cc []string // CC recipients + queue string // Queue name (for queue messages) + channel string // Channel name (for broadcast messages) + claimedBy string // Who claimed the queue message + claimedAt *time.Time // When the queue message was claimed } // ParseLabels extracts metadata from the labels array. @@ -199,6 +305,17 @@ func (bm *BeadsMessage) ParseLabels() { bm.msgType = strings.TrimPrefix(label, "msg-type:") } else if strings.HasPrefix(label, "cc:") { bm.cc = append(bm.cc, strings.TrimPrefix(label, "cc:")) + } else if strings.HasPrefix(label, "queue:") { + bm.queue = strings.TrimPrefix(label, "queue:") + } else if strings.HasPrefix(label, "channel:") { + bm.channel = strings.TrimPrefix(label, "channel:") + } else if strings.HasPrefix(label, "claimed-by:") { + bm.claimedBy = strings.TrimPrefix(label, "claimed-by:") + } else if strings.HasPrefix(label, "claimed-at:") { + ts := strings.TrimPrefix(label, "claimed-at:") + if t, err := time.Parse(time.RFC3339, ts); err == nil { + bm.claimedAt = &t + } } } } @@ -263,9 +380,51 @@ func (bm *BeadsMessage) ToMessage() *Message { ReplyTo: bm.replyTo, Wisp: bm.Wisp, CC: ccAddrs, + Queue: bm.queue, + Channel: bm.channel, + ClaimedBy: bm.claimedBy, + ClaimedAt: bm.claimedAt, } } +// GetQueue returns the queue name for queue messages. +func (bm *BeadsMessage) GetQueue() string { + return bm.queue +} + +// GetChannel returns the channel name for broadcast messages. +func (bm *BeadsMessage) GetChannel() string { + return bm.channel +} + +// GetClaimedBy returns who claimed the queue message. +func (bm *BeadsMessage) GetClaimedBy() string { + return bm.claimedBy +} + +// GetClaimedAt returns when the queue message was claimed. +func (bm *BeadsMessage) GetClaimedAt() *time.Time { + return bm.claimedAt +} + +// IsQueueMessage returns true if this is a queue-routed message. +func (bm *BeadsMessage) IsQueueMessage() bool { + bm.ParseLabels() + return bm.queue != "" +} + +// IsChannelMessage returns true if this is a channel broadcast message. +func (bm *BeadsMessage) IsChannelMessage() bool { + bm.ParseLabels() + return bm.channel != "" +} + +// IsDirectMessage returns true if this is a direct (To-addressed) message. +func (bm *BeadsMessage) IsDirectMessage() bool { + bm.ParseLabels() + return bm.queue == "" && bm.channel == "" && bm.Assignee != "" +} + // HasLabel checks if the message has a specific label. func (bm *BeadsMessage) HasLabel(label string) bool { for _, l := range bm.Labels { diff --git a/internal/mail/types_test.go b/internal/mail/types_test.go index 5678bea5..0be64ef7 100644 --- a/internal/mail/types_test.go +++ b/internal/mail/types_test.go @@ -349,3 +349,366 @@ func TestBeadsMessageToMessageEmptyLabels(t *testing.T) { t.Errorf("ThreadID should be empty, got %q", msg.ThreadID) } } + +func TestNewQueueMessage(t *testing.T) { + msg := NewQueueMessage("mayor/", "work-requests", "New Task", "Please process this") + + if msg.From != "mayor/" { + t.Errorf("From = %q, want 'mayor/'", msg.From) + } + if msg.Queue != "work-requests" { + t.Errorf("Queue = %q, want 'work-requests'", msg.Queue) + } + if msg.To != "" { + t.Errorf("To should be empty for queue messages, got %q", msg.To) + } + if msg.Channel != "" { + t.Errorf("Channel should be empty for queue messages, got %q", msg.Channel) + } + if msg.Type != TypeTask { + t.Errorf("Type = %q, want TypeTask", msg.Type) + } + if msg.ID == "" { + t.Error("ID should be generated") + } + if msg.ThreadID == "" { + t.Error("ThreadID should be generated") + } +} + +func TestNewChannelMessage(t *testing.T) { + msg := NewChannelMessage("deacon/", "alerts", "System Alert", "System is healthy") + + if msg.From != "deacon/" { + t.Errorf("From = %q, want 'deacon/'", msg.From) + } + if msg.Channel != "alerts" { + t.Errorf("Channel = %q, want 'alerts'", msg.Channel) + } + if msg.To != "" { + t.Errorf("To should be empty for channel messages, got %q", msg.To) + } + if msg.Queue != "" { + t.Errorf("Queue should be empty for channel messages, got %q", msg.Queue) + } + if msg.Type != TypeNotification { + t.Errorf("Type = %q, want TypeNotification", msg.Type) + } +} + +func TestMessageIsQueueMessage(t *testing.T) { + directMsg := NewMessage("mayor/", "gastown/Toast", "Test", "Body") + queueMsg := NewQueueMessage("mayor/", "work-requests", "Task", "Body") + channelMsg := NewChannelMessage("deacon/", "alerts", "Alert", "Body") + + if directMsg.IsQueueMessage() { + t.Error("Direct message should not be a queue message") + } + if !queueMsg.IsQueueMessage() { + t.Error("Queue message should be a queue message") + } + if channelMsg.IsQueueMessage() { + t.Error("Channel message should not be a queue message") + } +} + +func TestMessageIsChannelMessage(t *testing.T) { + directMsg := NewMessage("mayor/", "gastown/Toast", "Test", "Body") + queueMsg := NewQueueMessage("mayor/", "work-requests", "Task", "Body") + channelMsg := NewChannelMessage("deacon/", "alerts", "Alert", "Body") + + if directMsg.IsChannelMessage() { + t.Error("Direct message should not be a channel message") + } + if queueMsg.IsChannelMessage() { + t.Error("Queue message should not be a channel message") + } + if !channelMsg.IsChannelMessage() { + t.Error("Channel message should be a channel message") + } +} + +func TestMessageIsDirectMessage(t *testing.T) { + directMsg := NewMessage("mayor/", "gastown/Toast", "Test", "Body") + queueMsg := NewQueueMessage("mayor/", "work-requests", "Task", "Body") + channelMsg := NewChannelMessage("deacon/", "alerts", "Alert", "Body") + + if !directMsg.IsDirectMessage() { + t.Error("Direct message should be a direct message") + } + if queueMsg.IsDirectMessage() { + t.Error("Queue message should not be a direct message") + } + if channelMsg.IsDirectMessage() { + t.Error("Channel message should not be a direct message") + } +} + +func TestMessageValidate(t *testing.T) { + tests := []struct { + name string + msg *Message + wantErr bool + errMsg string + }{ + { + name: "valid direct message", + msg: NewMessage("mayor/", "gastown/Toast", "Test", "Body"), + wantErr: false, + }, + { + name: "valid queue message", + msg: NewQueueMessage("mayor/", "work-requests", "Task", "Body"), + wantErr: false, + }, + { + name: "valid channel message", + msg: NewChannelMessage("deacon/", "alerts", "Alert", "Body"), + wantErr: false, + }, + { + name: "no routing target", + msg: &Message{ + ID: "msg-001", + From: "mayor/", + Subject: "Test", + }, + wantErr: true, + errMsg: "must have exactly one of", + }, + { + name: "both to and queue", + msg: &Message{ + ID: "msg-001", + From: "mayor/", + To: "gastown/Toast", + Queue: "work-requests", + Subject: "Test", + }, + wantErr: true, + errMsg: "mutually exclusive", + }, + { + name: "both to and channel", + msg: &Message{ + ID: "msg-001", + From: "mayor/", + To: "gastown/Toast", + Channel: "alerts", + Subject: "Test", + }, + wantErr: true, + errMsg: "mutually exclusive", + }, + { + name: "both queue and channel", + msg: &Message{ + ID: "msg-001", + From: "mayor/", + Queue: "work-requests", + Channel: "alerts", + Subject: "Test", + }, + wantErr: true, + errMsg: "mutually exclusive", + }, + { + name: "claimed_by on non-queue message", + msg: &Message{ + ID: "msg-001", + From: "mayor/", + To: "gastown/Toast", + Subject: "Test", + ClaimedBy: "gastown/nux", + }, + wantErr: true, + errMsg: "claimed_by is only valid for queue messages", + }, + { + name: "claimed_by on queue message is valid", + msg: &Message{ + ID: "msg-001", + From: "mayor/", + Queue: "work-requests", + Subject: "Test", + ClaimedBy: "gastown/nux", + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.msg.Validate() + if tt.wantErr { + if err == nil { + t.Error("expected error but got nil") + } else if tt.errMsg != "" && !containsString(err.Error(), tt.errMsg) { + t.Errorf("error %q should contain %q", err.Error(), tt.errMsg) + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + } + }) + } +} + +func containsString(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(substr) == 0 || + (len(s) > 0 && len(substr) > 0 && findSubstring(s, substr))) +} + +func findSubstring(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +func TestBeadsMessageParseQueueChannelLabels(t *testing.T) { + claimedTime := time.Date(2026, 1, 14, 12, 0, 0, 0, time.UTC) + claimedAtStr := claimedTime.Format(time.RFC3339) + + bm := BeadsMessage{ + ID: "hq-queue", + Title: "Queue Message", + Description: "Test queue message", + Status: "open", + Labels: []string{ + "from:mayor/", + "queue:work-requests", + "claimed-by:gastown/nux", + "claimed-at:" + claimedAtStr, + }, + Priority: 2, + } + + msg := bm.ToMessage() + + if msg.Queue != "work-requests" { + t.Errorf("Queue = %q, want 'work-requests'", msg.Queue) + } + if msg.ClaimedBy != "gastown/nux" { + t.Errorf("ClaimedBy = %q, want 'gastown/nux'", msg.ClaimedBy) + } + if msg.ClaimedAt == nil { + t.Error("ClaimedAt should not be nil") + } else if !msg.ClaimedAt.Equal(claimedTime) { + t.Errorf("ClaimedAt = %v, want %v", msg.ClaimedAt, claimedTime) + } +} + +func TestBeadsMessageParseChannelLabel(t *testing.T) { + bm := BeadsMessage{ + ID: "hq-channel", + Title: "Channel Message", + Description: "Test channel message", + Status: "open", + Labels: []string{"from:deacon/", "channel:alerts"}, + Priority: 2, + } + + msg := bm.ToMessage() + + if msg.Channel != "alerts" { + t.Errorf("Channel = %q, want 'alerts'", msg.Channel) + } + if msg.Queue != "" { + t.Errorf("Queue should be empty, got %q", msg.Queue) + } +} + +func TestBeadsMessageIsQueueMessage(t *testing.T) { + queueMsg := BeadsMessage{ + ID: "hq-queue", + Labels: []string{"queue:work-requests"}, + } + directMsg := BeadsMessage{ + ID: "hq-direct", + Assignee: "gastown/Toast", + } + channelMsg := BeadsMessage{ + ID: "hq-channel", + Labels: []string{"channel:alerts"}, + } + + if !queueMsg.IsQueueMessage() { + t.Error("Queue message should be identified as queue message") + } + if directMsg.IsQueueMessage() { + t.Error("Direct message should not be identified as queue message") + } + if channelMsg.IsQueueMessage() { + t.Error("Channel message should not be identified as queue message") + } +} + +func TestBeadsMessageIsChannelMessage(t *testing.T) { + queueMsg := BeadsMessage{ + ID: "hq-queue", + Labels: []string{"queue:work-requests"}, + } + directMsg := BeadsMessage{ + ID: "hq-direct", + Assignee: "gastown/Toast", + } + channelMsg := BeadsMessage{ + ID: "hq-channel", + Labels: []string{"channel:alerts"}, + } + + if queueMsg.IsChannelMessage() { + t.Error("Queue message should not be identified as channel message") + } + if directMsg.IsChannelMessage() { + t.Error("Direct message should not be identified as channel message") + } + if !channelMsg.IsChannelMessage() { + t.Error("Channel message should be identified as channel message") + } +} + +func TestBeadsMessageIsDirectMessage(t *testing.T) { + queueMsg := BeadsMessage{ + ID: "hq-queue", + Labels: []string{"queue:work-requests"}, + } + directMsg := BeadsMessage{ + ID: "hq-direct", + Assignee: "gastown/Toast", + } + channelMsg := BeadsMessage{ + ID: "hq-channel", + Labels: []string{"channel:alerts"}, + } + + if queueMsg.IsDirectMessage() { + t.Error("Queue message should not be identified as direct message") + } + if !directMsg.IsDirectMessage() { + t.Error("Direct message should be identified as direct message") + } + if channelMsg.IsDirectMessage() { + t.Error("Channel message should not be identified as direct message") + } +} + +func TestMessageIsClaimed(t *testing.T) { + unclaimed := NewQueueMessage("mayor/", "work-requests", "Task", "Body") + if unclaimed.IsClaimed() { + t.Error("Unclaimed message should not be claimed") + } + + claimed := NewQueueMessage("mayor/", "work-requests", "Task", "Body") + claimed.ClaimedBy = "gastown/nux" + now := time.Now() + claimed.ClaimedAt = &now + + if !claimed.IsClaimed() { + t.Error("Claimed message should be claimed") + } +}