From 5378e566a3a4be72e0a776f4860ba39705794845 Mon Sep 17 00:00:00 2001 From: gastown/polecats/capable Date: Tue, 30 Dec 2025 22:46:25 -0800 Subject: [PATCH] feat: Add sendToQueue() for queue message delivery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements queue message delivery in internal/mail/router.go: - Validates queue exists via expandQueue() - Creates single message (no fan-out unlike lists) - Stores in town-level beads with queue metadata label - Uses queue:name as assignee for inbox filtering 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/mail/router.go | 82 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/internal/mail/router.go b/internal/mail/router.go index d652c6e2..fb9b78be 100644 --- a/internal/mail/router.go +++ b/internal/mail/router.go @@ -478,12 +478,19 @@ func (r *Router) shouldBeWisp(msg *Message) bool { // Supports fan-out for: // - Mailing lists (list:name) - fans out to all list members // - @group addresses - resolves and fans out to matching agents +// Supports queue delivery for: +// - Queues (queue:name) - stores single message for worker claiming func (r *Router) Send(msg *Message) error { // Check for mailing list address if isListAddress(msg.To) { return r.sendToList(msg) } + // Check for queue address - single message for claiming + if isQueueAddress(msg.To) { + return r.sendToQueue(msg) + } + // Check for @group address - resolve and fan-out if isGroupAddress(msg.To) { return r.sendToGroup(msg) @@ -642,6 +649,81 @@ func (r *Router) ExpandListAddress(address string) ([]string, error) { return r.expandList(parseListName(address)) } +// sendToQueue delivers a message to a queue for worker claiming. +// Unlike sendToList, this creates a SINGLE message (no fan-out). +// The message is stored in town-level beads with queue metadata. +// Workers claim messages using bd update --claimed-by. +func (r *Router) sendToQueue(msg *Message) error { + queueName := parseQueueName(msg.To) + + // Validate queue exists in messaging config + _, err := r.expandQueue(queueName) + if err != nil { + return err + } + + // Build labels for from/thread/reply-to/cc plus queue metadata + var labels []string + labels = append(labels, "from:"+msg.From) + labels = append(labels, "queue:"+queueName) + if msg.ThreadID != "" { + labels = append(labels, "thread:"+msg.ThreadID) + } + if msg.ReplyTo != "" { + labels = append(labels, "reply-to:"+msg.ReplyTo) + } + for _, cc := range msg.CC { + ccIdentity := addressToIdentity(cc) + labels = append(labels, "cc:"+ccIdentity) + } + + // Build command: bd create --type=message --assignee=queue: -d + // Use queue: as assignee so inbox queries can filter by queue + args := []string{"create", msg.Subject, + "--type", "message", + "--assignee", msg.To, // queue:name + "-d", msg.Body, + } + + // Add priority flag + beadsPriority := PriorityToBeads(msg.Priority) + args = append(args, "--priority", fmt.Sprintf("%d", beadsPriority)) + + // Add labels (includes queue name for filtering) + if len(labels) > 0 { + args = append(args, "--labels", strings.Join(labels, ",")) + } + + // Add actor for attribution (sender identity) + args = append(args, "--actor", msg.From) + + // Queue messages are never ephemeral - they need to persist until claimed + // (deliberately not checking shouldBeWisp) + + // Queue messages go to town-level beads (shared location) + beadsDir := r.resolveBeadsDir("") + cmd := exec.Command("bd", args...) + cmd.Env = append(cmd.Environ(), + "BEADS_DIR="+beadsDir, + ) + cmd.Dir = filepath.Dir(beadsDir) // Run in parent of .beads + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + errMsg := strings.TrimSpace(stderr.String()) + if errMsg != "" { + return errors.New(errMsg) + } + return fmt.Errorf("sending to queue %s: %w", queueName, err) + } + + // No notification for queue messages - workers poll or check on their own schedule + + return nil +} + // isSelfMail returns true if sender and recipient are the same identity. // Normalizes addresses by removing trailing slashes for comparison. func isSelfMail(from, to string) bool {