feat: Add sendToQueue() for queue message delivery
Implements queue message delivery in internal/mail/router.go: - Validates queue exists via expandQueue() - Creates single message (no fan-out unlike lists) - Stores in town-level beads with queue metadata label - Uses queue:name as assignee for inbox filtering 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
Steve Yegge
parent
24d5231661
commit
5378e566a3
@@ -478,12 +478,19 @@ func (r *Router) shouldBeWisp(msg *Message) bool {
|
||||
// Supports fan-out for:
|
||||
// - Mailing lists (list:name) - fans out to all list members
|
||||
// - @group addresses - resolves and fans out to matching agents
|
||||
// Supports queue delivery for:
|
||||
// - Queues (queue:name) - stores single message for worker claiming
|
||||
func (r *Router) Send(msg *Message) error {
|
||||
// Check for mailing list address
|
||||
if isListAddress(msg.To) {
|
||||
return r.sendToList(msg)
|
||||
}
|
||||
|
||||
// Check for queue address - single message for claiming
|
||||
if isQueueAddress(msg.To) {
|
||||
return r.sendToQueue(msg)
|
||||
}
|
||||
|
||||
// Check for @group address - resolve and fan-out
|
||||
if isGroupAddress(msg.To) {
|
||||
return r.sendToGroup(msg)
|
||||
@@ -642,6 +649,81 @@ func (r *Router) ExpandListAddress(address string) ([]string, error) {
|
||||
return r.expandList(parseListName(address))
|
||||
}
|
||||
|
||||
// sendToQueue delivers a message to a queue for worker claiming.
|
||||
// Unlike sendToList, this creates a SINGLE message (no fan-out).
|
||||
// The message is stored in town-level beads with queue metadata.
|
||||
// Workers claim messages using bd update --claimed-by.
|
||||
func (r *Router) sendToQueue(msg *Message) error {
|
||||
queueName := parseQueueName(msg.To)
|
||||
|
||||
// Validate queue exists in messaging config
|
||||
_, err := r.expandQueue(queueName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build labels for from/thread/reply-to/cc plus queue metadata
|
||||
var labels []string
|
||||
labels = append(labels, "from:"+msg.From)
|
||||
labels = append(labels, "queue:"+queueName)
|
||||
if msg.ThreadID != "" {
|
||||
labels = append(labels, "thread:"+msg.ThreadID)
|
||||
}
|
||||
if msg.ReplyTo != "" {
|
||||
labels = append(labels, "reply-to:"+msg.ReplyTo)
|
||||
}
|
||||
for _, cc := range msg.CC {
|
||||
ccIdentity := addressToIdentity(cc)
|
||||
labels = append(labels, "cc:"+ccIdentity)
|
||||
}
|
||||
|
||||
// Build command: bd create <subject> --type=message --assignee=queue:<name> -d <body>
|
||||
// Use queue:<name> as assignee so inbox queries can filter by queue
|
||||
args := []string{"create", msg.Subject,
|
||||
"--type", "message",
|
||||
"--assignee", msg.To, // queue:name
|
||||
"-d", msg.Body,
|
||||
}
|
||||
|
||||
// Add priority flag
|
||||
beadsPriority := PriorityToBeads(msg.Priority)
|
||||
args = append(args, "--priority", fmt.Sprintf("%d", beadsPriority))
|
||||
|
||||
// Add labels (includes queue name for filtering)
|
||||
if len(labels) > 0 {
|
||||
args = append(args, "--labels", strings.Join(labels, ","))
|
||||
}
|
||||
|
||||
// Add actor for attribution (sender identity)
|
||||
args = append(args, "--actor", msg.From)
|
||||
|
||||
// Queue messages are never ephemeral - they need to persist until claimed
|
||||
// (deliberately not checking shouldBeWisp)
|
||||
|
||||
// Queue messages go to town-level beads (shared location)
|
||||
beadsDir := r.resolveBeadsDir("")
|
||||
cmd := exec.Command("bd", args...)
|
||||
cmd.Env = append(cmd.Environ(),
|
||||
"BEADS_DIR="+beadsDir,
|
||||
)
|
||||
cmd.Dir = filepath.Dir(beadsDir) // Run in parent of .beads
|
||||
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
errMsg := strings.TrimSpace(stderr.String())
|
||||
if errMsg != "" {
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
return fmt.Errorf("sending to queue %s: %w", queueName, err)
|
||||
}
|
||||
|
||||
// No notification for queue messages - workers poll or check on their own schedule
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isSelfMail returns true if sender and recipient are the same identity.
|
||||
// Normalizes addresses by removing trailing slashes for comparison.
|
||||
func isSelfMail(from, to string) bool {
|
||||
|
||||
Reference in New Issue
Block a user