From 4f02abb535226f5a722f2749668c33c4006c29df Mon Sep 17 00:00:00 2001 From: gastown/crew/max Date: Thu, 15 Jan 2026 07:22:35 -0800 Subject: [PATCH] fix(mail): add channel routing to router.Send() The router was missing support for beads-native channel addresses. When mail_send.go resolved an address to RecipientChannel, it set msg.To to "channel:" but router.Send() had no handler for this prefix, causing channel messages to fail silently. Added: - isChannelAddress() and parseChannelName() helper functions - sendToChannel() method that creates messages with proper channel: labels for channel queries - Channel validation before sending - Retention enforcement after message creation Also updated docs/beads-native-messaging.md with more comprehensive documentation of the beads-native messaging system. Co-Authored-By: Claude Opus 4.5 --- docs/beads-native-messaging.md | 191 ++++++++++++++------------------- internal/mail/router.go | 88 +++++++++++++++ 2 files changed, 171 insertions(+), 108 deletions(-) diff --git a/docs/beads-native-messaging.md b/docs/beads-native-messaging.md index eb3629e1..7ec3e3ff 100644 --- a/docs/beads-native-messaging.md +++ b/docs/beads-native-messaging.md @@ -1,89 +1,84 @@ # Beads-Native Messaging -This document describes the beads-native messaging system, which extends Gas Town's mail system with first-class support for groups, queues, and channels backed by beads (Git-native storage). +This document describes the beads-native messaging system for Gas Town, which replaces the file-based messaging configuration with persistent beads stored in the town's `.beads` directory. ## Overview -Beads-native messaging introduces three new bead types that integrate with the existing mail system: +Beads-native messaging introduces three new bead types for managing communication: -- **Groups** (`gt:group`) - Named distribution lists for multi-recipient delivery -- **Queues** (`gt:queue`) - Work queues where workers claim items -- **Channels** (`gt:channel`) - Pub/sub broadcast streams with retention policies +- **Groups** (`gt:group`) - Named collections of addresses for mail distribution +- **Queues** (`gt:queue`) - Work queues where messages can be claimed by workers +- **Channels** (`gt:channel`) - Pub/sub broadcast streams with message retention -All three are stored as beads, providing Git-native storage, audit trails, and replication. +All messaging beads use the `hq-` prefix because they are town-level entities that span rigs. ## Bead Types ### Groups (`gt:group`) -Groups are named collections of addresses used for mail distribution. Members can be: -- Direct agent addresses (`gastown/crew/max`) -- Wildcard patterns (`*/witness`, `gastown/*`) -- Nested group names (groups can contain other groups) +Groups are named collections of addresses used for mail distribution. When you send to a group, the message is delivered to all members. -**ID Format:** `hq-group-` (e.g., `hq-group-ops-team`) +**Bead ID format:** `hq-group-` (e.g., `hq-group-ops-team`) **Fields:** - `name` - Unique group name -- `members` - Comma-separated list of addresses/patterns -- `created_by` - Who created the group +- `members` - Comma-separated list of addresses, patterns, or nested group names +- `created_by` - Who created the group (from BD_ACTOR) - `created_at` - ISO 8601 timestamp -**Source:** `internal/beads/beads_group.go` +**Member types:** +- Direct addresses: `gastown/crew/max`, `mayor/`, `deacon/` +- Wildcard patterns: `*/witness`, `gastown/*`, `gastown/crew/*` +- Special patterns: `@town`, `@crew`, `@witnesses` +- Nested groups: Reference other group names ### Queues (`gt:queue`) -Queues are work queues where multiple workers can claim items. Messages sent to a queue are delivered once to a claiming worker. +Queues are work queues where messages wait to be claimed by workers. Unlike groups, each message goes to exactly one claimant. -**ID Format:** `gt-q-` or `hq-q-` (town-level) +**Bead ID format:** `hq-q-` (town-level) or `gt-q-` (rig-level) **Fields:** - `name` - Queue name - `status` - `active`, `paused`, or `closed` - `max_concurrency` - Maximum concurrent workers (0 = unlimited) - `processing_order` - `fifo` or `priority` -- `available_count`, `processing_count`, `completed_count`, `failed_count` - Queue statistics - -**Source:** `internal/beads/beads_queue.go` +- `available_count` - Items ready to process +- `processing_count` - Items currently being processed +- `completed_count` - Items completed +- `failed_count` - Items that failed ### Channels (`gt:channel`) -Channels are pub/sub broadcast streams. Messages sent to a channel are retained according to the channel's retention policy and can be viewed by any subscriber. +Channels are pub/sub streams for broadcasting messages. Messages are retained according to the channel's retention policy. -**ID Format:** `hq-channel-` (e.g., `hq-channel-alerts`) +**Bead ID format:** `hq-channel-` (e.g., `hq-channel-alerts`) **Fields:** - `name` - Unique channel name - `subscribers` - Comma-separated list of subscribed addresses - `status` - `active` or `closed` -- `retention_count` - Number of messages to retain (0 = unlimited) +- `retention_count` - Number of recent messages to retain (0 = unlimited) - `retention_hours` - Hours to retain messages (0 = forever) - `created_by` - Who created the channel - `created_at` - ISO 8601 timestamp -**Retention Enforcement:** -- On-write cleanup: When a message is posted, old messages are pruned if over the limit -- Patrol cleanup: Deacon patrol runs periodic cleanup with 10% buffer to avoid thrashing - -**Source:** `internal/beads/beads_channel.go` - ## CLI Commands -### Group Commands +### Group Management ```bash # List all groups -gt mail group list [--json] +gt mail group list # Show group details -gt mail group show [--json] +gt mail group show -# Create a group with members +# Create a new group with members gt mail group create [members...] gt mail group create ops-team gastown/witness gastown/crew/max -gt mail group create ops-team --member gastown/witness --member gastown/crew/max -# Add member to existing group +# Add member to group gt mail group add # Remove member from group @@ -93,18 +88,16 @@ gt mail group remove gt mail group delete ``` -**Source:** `internal/cmd/mail_group.go` - -### Channel Commands +### Channel Management ```bash # List all channels -gt mail channel [--json] -gt mail channel list [--json] +gt mail channel +gt mail channel list # View channel messages -gt mail channel [--json] -gt mail channel show [--json] +gt mail channel +gt mail channel show # Create a channel with retention policy gt mail channel create [--retain-count=N] [--retain-hours=N] @@ -114,113 +107,95 @@ gt mail channel create alerts --retain-count=100 gt mail channel delete ``` -**Source:** `internal/cmd/mail_channel.go` - ### Sending Messages -The `gt mail send` command supports all address types through the address resolver: +The `gt mail send` command now supports groups, queues, and channels: ```bash -# Send to agent (direct) -gt mail send gastown/crew/max -s "Hello" -m "Message body" +# Send to a group (expands to all members) +gt mail send my-group -s "Subject" -m "Body" -# Send to group (expands to all members) -gt mail send ops-team -s "Alert" -m "Important message" -gt mail send group:ops-team -s "Alert" -m "Explicit group syntax" +# Send to a queue (single message, workers claim) +gt mail send queue:my-queue -s "Work item" -m "Details" -# Send to queue (delivered to one claiming worker) -gt mail send queue:build-queue -s "Job" -m "Build request" +# Send to a channel (broadcast with retention) +gt mail send channel:my-channel -s "Announcement" -m "Content" -# Send to channel (broadcast, retained) -gt mail send channel:alerts -s "Alert" -m "System alert" - -# Send to pattern (wildcards) -gt mail send "*/witness" -s "Witness alert" -m "All witnesses" +# Direct address (unchanged) +gt mail send gastown/crew/max -s "Hello" -m "World" ``` -**Source:** `internal/cmd/mail_send.go` - ## Address Resolution -The address resolver (`internal/mail/resolve.go`) determines how to route messages based on the address format: - -### Resolution Order +When sending mail, addresses are resolved in this order: 1. **Explicit prefix** - If address starts with `group:`, `queue:`, or `channel:`, use that type directly 2. **Contains `/`** - Treat as agent address or pattern (direct delivery) -3. **Starts with `@`** - Check for beads group, then fall back to built-in patterns -4. **Name lookup** - Search in order: group → queue → channel +3. **Starts with `@`** - Special pattern (`@town`, `@crew`, etc.) or beads-native group +4. **Name lookup** - Search for group → queue → channel by name -### Address Formats +If a name matches multiple types (e.g., both a group and a channel named "alerts"), the resolver returns an error and requires an explicit prefix. -| Format | Type | Example | -|--------|------|---------| -| `group:` | Group | `group:ops-team` | -| `queue:` | Queue | `queue:build-queue` | -| `channel:` | Channel | `channel:alerts` | -| `//` | Agent | `gastown/crew/max` | -| `/` | Agent | `gastown/witness` | -| `*/` | Pattern | `*/witness` (all witnesses) | -| `@` | Group/Pattern | `@ops-team` | - -### Conflict Handling - -If a name matches multiple types (e.g., both a group and a channel named "alerts"), the resolver returns an error requiring an explicit prefix: - -``` -ambiguous address "alerts": matches multiple types. Use explicit prefix: group:alerts, channel:alerts -``` - -## Key Files +## Key Implementation Files | File | Description | |------|-------------| | `internal/beads/beads_group.go` | Group bead CRUD operations | | `internal/beads/beads_queue.go` | Queue bead CRUD operations | -| `internal/beads/beads_channel.go` | Channel bead CRUD + retention | +| `internal/beads/beads_channel.go` | Channel bead + retention logic | | `internal/mail/resolve.go` | Address resolution logic | | `internal/cmd/mail_group.go` | Group CLI commands | | `internal/cmd/mail_channel.go` | Channel CLI commands | -| `internal/cmd/mail_send.go` | Send command with resolver | +| `internal/cmd/mail_send.go` | Updated send with resolver | + +## Retention Policy + +Channels support two retention mechanisms: + +- **Count-based** (`--retain-count=N`): Keep only the last N messages +- **Time-based** (`--retain-hours=N`): Delete messages older than N hours + +Retention is enforced: +1. **On-write**: After posting a new message, old messages are pruned +2. **On-patrol**: Deacon patrol runs `PruneAllChannels()` as a backup cleanup + +The patrol uses a 10% buffer to avoid thrashing (only prunes if count > retainCount × 1.1). ## Examples -### Create a Team Distribution Group +### Create a team distribution group ```bash -# Create group -gt mail group create dev-team gastown/crew/max gastown/crew/dennis +# Create a group for the ops team +gt mail group create ops-team gastown/witness gastown/crew/max deacon/ -# Add another member -gt mail group add dev-team gastown/crew/george +# Send to the group +gt mail send ops-team -s "Team meeting" -m "Tomorrow at 10am" -# Send to entire team -gt mail send dev-team -s "Standup" -m "Daily standup in 5 minutes" +# Add a new member +gt mail group add ops-team gastown/crew/dennis ``` -### Set Up a Build Alert Channel +### Set up an alerts channel ```bash -# Create channel with retention -gt mail channel create build-alerts --retain-count=50 +# Create an alerts channel that keeps last 50 messages +gt mail channel create alerts --retain-count=50 -# Send build notifications -gt mail send channel:build-alerts -s "Build #123 passed" -m "All tests green" +# Send an alert +gt mail send channel:alerts -s "Build failed" -m "See CI for details" -# View channel history -gt mail channel build-alerts +# View recent alerts +gt mail channel alerts ``` -### Nested Groups +### Create nested groups ```bash -# Create base groups -gt mail group create witnesses gastown/witness ranchero/witness -gt mail group create crew gastown/crew/max gastown/crew/dennis +# Create role-based groups +gt mail group create witnesses */witness +gt mail group create leads gastown/crew/max gastown/crew/dennis -# Create umbrella group that includes other groups -gt mail group create all-agents witnesses crew deacon/ - -# Send to everyone -gt mail send all-agents -s "Town meeting" -m "All hands meeting at noon" +# Create a group that includes other groups +gt mail group create all-hands witnesses leads mayor/ ``` diff --git a/internal/mail/router.go b/internal/mail/router.go index 29a928da..28509899 100644 --- a/internal/mail/router.go +++ b/internal/mail/router.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" + "github.com/steveyegge/gastown/internal/beads" "github.com/steveyegge/gastown/internal/config" "github.com/steveyegge/gastown/internal/session" "github.com/steveyegge/gastown/internal/tmux" @@ -85,6 +86,16 @@ func parseAnnounceName(address string) string { return strings.TrimPrefix(address, "announce:") } +// isChannelAddress returns true if the address uses channel:name syntax (beads-native channels). +func isChannelAddress(address string) bool { + return strings.HasPrefix(address, "channel:") +} + +// parseChannelName extracts the channel name from a channel:name address. +func parseChannelName(address string) string { + return strings.TrimPrefix(address, "channel:") +} + // expandFromConfig is a generic helper for config-based expansion. // It loads the messaging config and calls the getter to extract the desired value. // This consolidates the common pattern of: check townRoot, load config, lookup in map. @@ -515,6 +526,11 @@ func (r *Router) Send(msg *Message) error { return r.sendToAnnounce(msg) } + // Check for beads-native channel address - broadcast with retention + if isChannelAddress(msg.To) { + return r.sendToChannel(msg) + } + // Check for @group address - resolve and fan-out if isGroupAddress(msg.To) { return r.sendToGroup(msg) @@ -795,6 +811,78 @@ func (r *Router) sendToAnnounce(msg *Message) error { return nil } +// sendToChannel delivers a message to a beads-native channel. +// Creates a message with channel: label for channel queries. +// Retention is enforced by the channel's EnforceChannelRetention after message creation. +func (r *Router) sendToChannel(msg *Message) error { + channelName := parseChannelName(msg.To) + + // Validate channel exists as a beads-native channel + if r.townRoot == "" { + return fmt.Errorf("town root not set, cannot send to channel: %s", channelName) + } + b := beads.New(r.townRoot) + _, fields, err := b.GetChannelBead(channelName) + if err != nil { + return fmt.Errorf("getting channel %s: %w", channelName, err) + } + if fields == nil { + return fmt.Errorf("channel not found: %s", channelName) + } + + // Build labels for from/thread/reply-to/cc plus channel metadata + var labels []string + labels = append(labels, "from:"+msg.From) + labels = append(labels, "channel:"+channelName) + if msg.ThreadID != "" { + labels = append(labels, "thread:"+msg.ThreadID) + } + if msg.ReplyTo != "" { + labels = append(labels, "reply-to:"+msg.ReplyTo) + } + for _, cc := range msg.CC { + ccIdentity := addressToIdentity(cc) + labels = append(labels, "cc:"+ccIdentity) + } + + // Build command: bd create --type=message --assignee=channel: -d + // Use channel: as assignee so queries can filter by channel + args := []string{"create", msg.Subject, + "--type", "message", + "--assignee", msg.To, // channel:name + "-d", msg.Body, + } + + // Add priority flag + beadsPriority := PriorityToBeads(msg.Priority) + args = append(args, "--priority", fmt.Sprintf("%d", beadsPriority)) + + // Add labels (includes channel name for filtering) + if len(labels) > 0 { + args = append(args, "--labels", strings.Join(labels, ",")) + } + + // Add actor for attribution (sender identity) + args = append(args, "--actor", msg.From) + + // Channel messages are never ephemeral - they persist according to retention policy + // (deliberately not checking shouldBeWisp) + + // Channel messages go to town-level beads (shared location) + beadsDir := r.resolveBeadsDir("") + _, err = runBdCommand(args, filepath.Dir(beadsDir), beadsDir) + if err != nil { + return fmt.Errorf("sending to channel %s: %w", channelName, err) + } + + // Enforce channel retention policy (on-write cleanup) + _ = b.EnforceChannelRetention(channelName) + + // No notification for channel messages - readers poll or check on their own schedule + + return nil +} + // pruneAnnounce deletes oldest messages from an announce channel to enforce retention. // If the channel has >= retainCount messages, deletes the oldest until count < retainCount. func (r *Router) pruneAnnounce(announceName string, retainCount int) error {