fix(mail): add channel routing to router.Send()

The router was missing support for beads-native channel addresses.
When mail_send.go resolved an address to RecipientChannel, it set
msg.To to "channel:<name>" but router.Send() had no handler for this
prefix, causing channel messages to fail silently.

Added:
- isChannelAddress() and parseChannelName() helper functions
- sendToChannel() method that creates messages with proper channel:
  labels for channel queries
- Channel validation before sending
- Retention enforcement after message creation

Also updated docs/beads-native-messaging.md with more comprehensive
documentation of the beads-native messaging system.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gastown/crew/max
2026-01-15 07:22:35 -08:00
committed by Steve Yegge
parent cbbf566f06
commit 4f02abb535
2 changed files with 171 additions and 108 deletions

View File

@@ -1,89 +1,84 @@
# Beads-Native Messaging # Beads-Native Messaging
This document describes the beads-native messaging system, which extends Gas Town's mail system with first-class support for groups, queues, and channels backed by beads (Git-native storage). This document describes the beads-native messaging system for Gas Town, which replaces the file-based messaging configuration with persistent beads stored in the town's `.beads` directory.
## Overview ## Overview
Beads-native messaging introduces three new bead types that integrate with the existing mail system: Beads-native messaging introduces three new bead types for managing communication:
- **Groups** (`gt:group`) - Named distribution lists for multi-recipient delivery - **Groups** (`gt:group`) - Named collections of addresses for mail distribution
- **Queues** (`gt:queue`) - Work queues where workers claim items - **Queues** (`gt:queue`) - Work queues where messages can be claimed by workers
- **Channels** (`gt:channel`) - Pub/sub broadcast streams with retention policies - **Channels** (`gt:channel`) - Pub/sub broadcast streams with message retention
All three are stored as beads, providing Git-native storage, audit trails, and replication. All messaging beads use the `hq-` prefix because they are town-level entities that span rigs.
## Bead Types ## Bead Types
### Groups (`gt:group`) ### Groups (`gt:group`)
Groups are named collections of addresses used for mail distribution. Members can be: Groups are named collections of addresses used for mail distribution. When you send to a group, the message is delivered to all members.
- Direct agent addresses (`gastown/crew/max`)
- Wildcard patterns (`*/witness`, `gastown/*`)
- Nested group names (groups can contain other groups)
**ID Format:** `hq-group-<name>` (e.g., `hq-group-ops-team`) **Bead ID format:** `hq-group-<name>` (e.g., `hq-group-ops-team`)
**Fields:** **Fields:**
- `name` - Unique group name - `name` - Unique group name
- `members` - Comma-separated list of addresses/patterns - `members` - Comma-separated list of addresses, patterns, or nested group names
- `created_by` - Who created the group - `created_by` - Who created the group (from BD_ACTOR)
- `created_at` - ISO 8601 timestamp - `created_at` - ISO 8601 timestamp
**Source:** `internal/beads/beads_group.go` **Member types:**
- Direct addresses: `gastown/crew/max`, `mayor/`, `deacon/`
- Wildcard patterns: `*/witness`, `gastown/*`, `gastown/crew/*`
- Special patterns: `@town`, `@crew`, `@witnesses`
- Nested groups: Reference other group names
### Queues (`gt:queue`) ### Queues (`gt:queue`)
Queues are work queues where multiple workers can claim items. Messages sent to a queue are delivered once to a claiming worker. Queues are work queues where messages wait to be claimed by workers. Unlike groups, each message goes to exactly one claimant.
**ID Format:** `gt-q-<name>` or `hq-q-<name>` (town-level) **Bead ID format:** `hq-q-<name>` (town-level) or `gt-q-<name>` (rig-level)
**Fields:** **Fields:**
- `name` - Queue name - `name` - Queue name
- `status` - `active`, `paused`, or `closed` - `status` - `active`, `paused`, or `closed`
- `max_concurrency` - Maximum concurrent workers (0 = unlimited) - `max_concurrency` - Maximum concurrent workers (0 = unlimited)
- `processing_order` - `fifo` or `priority` - `processing_order` - `fifo` or `priority`
- `available_count`, `processing_count`, `completed_count`, `failed_count` - Queue statistics - `available_count` - Items ready to process
- `processing_count` - Items currently being processed
**Source:** `internal/beads/beads_queue.go` - `completed_count` - Items completed
- `failed_count` - Items that failed
### Channels (`gt:channel`) ### Channels (`gt:channel`)
Channels are pub/sub broadcast streams. Messages sent to a channel are retained according to the channel's retention policy and can be viewed by any subscriber. Channels are pub/sub streams for broadcasting messages. Messages are retained according to the channel's retention policy.
**ID Format:** `hq-channel-<name>` (e.g., `hq-channel-alerts`) **Bead ID format:** `hq-channel-<name>` (e.g., `hq-channel-alerts`)
**Fields:** **Fields:**
- `name` - Unique channel name - `name` - Unique channel name
- `subscribers` - Comma-separated list of subscribed addresses - `subscribers` - Comma-separated list of subscribed addresses
- `status` - `active` or `closed` - `status` - `active` or `closed`
- `retention_count` - Number of messages to retain (0 = unlimited) - `retention_count` - Number of recent messages to retain (0 = unlimited)
- `retention_hours` - Hours to retain messages (0 = forever) - `retention_hours` - Hours to retain messages (0 = forever)
- `created_by` - Who created the channel - `created_by` - Who created the channel
- `created_at` - ISO 8601 timestamp - `created_at` - ISO 8601 timestamp
**Retention Enforcement:**
- On-write cleanup: When a message is posted, old messages are pruned if over the limit
- Patrol cleanup: Deacon patrol runs periodic cleanup with 10% buffer to avoid thrashing
**Source:** `internal/beads/beads_channel.go`
## CLI Commands ## CLI Commands
### Group Commands ### Group Management
```bash ```bash
# List all groups # List all groups
gt mail group list [--json] gt mail group list
# Show group details # Show group details
gt mail group show <name> [--json] gt mail group show <name>
# Create a group with members # Create a new group with members
gt mail group create <name> [members...] gt mail group create <name> [members...]
gt mail group create ops-team gastown/witness gastown/crew/max gt mail group create ops-team gastown/witness gastown/crew/max
gt mail group create ops-team --member gastown/witness --member gastown/crew/max
# Add member to existing group # Add member to group
gt mail group add <name> <member> gt mail group add <name> <member>
# Remove member from group # Remove member from group
@@ -93,18 +88,16 @@ gt mail group remove <name> <member>
gt mail group delete <name> gt mail group delete <name>
``` ```
**Source:** `internal/cmd/mail_group.go` ### Channel Management
### Channel Commands
```bash ```bash
# List all channels # List all channels
gt mail channel [--json] gt mail channel
gt mail channel list [--json] gt mail channel list
# View channel messages # View channel messages
gt mail channel <name> [--json] gt mail channel <name>
gt mail channel show <name> [--json] gt mail channel show <name>
# Create a channel with retention policy # Create a channel with retention policy
gt mail channel create <name> [--retain-count=N] [--retain-hours=N] gt mail channel create <name> [--retain-count=N] [--retain-hours=N]
@@ -114,113 +107,95 @@ gt mail channel create alerts --retain-count=100
gt mail channel delete <name> gt mail channel delete <name>
``` ```
**Source:** `internal/cmd/mail_channel.go`
### Sending Messages ### Sending Messages
The `gt mail send` command supports all address types through the address resolver: The `gt mail send` command now supports groups, queues, and channels:
```bash ```bash
# Send to agent (direct) # Send to a group (expands to all members)
gt mail send gastown/crew/max -s "Hello" -m "Message body" gt mail send my-group -s "Subject" -m "Body"
# Send to group (expands to all members) # Send to a queue (single message, workers claim)
gt mail send ops-team -s "Alert" -m "Important message" gt mail send queue:my-queue -s "Work item" -m "Details"
gt mail send group:ops-team -s "Alert" -m "Explicit group syntax"
# Send to queue (delivered to one claiming worker) # Send to a channel (broadcast with retention)
gt mail send queue:build-queue -s "Job" -m "Build request" gt mail send channel:my-channel -s "Announcement" -m "Content"
# Send to channel (broadcast, retained) # Direct address (unchanged)
gt mail send channel:alerts -s "Alert" -m "System alert" gt mail send gastown/crew/max -s "Hello" -m "World"
# Send to pattern (wildcards)
gt mail send "*/witness" -s "Witness alert" -m "All witnesses"
``` ```
**Source:** `internal/cmd/mail_send.go`
## Address Resolution ## Address Resolution
The address resolver (`internal/mail/resolve.go`) determines how to route messages based on the address format: When sending mail, addresses are resolved in this order:
### Resolution Order
1. **Explicit prefix** - If address starts with `group:`, `queue:`, or `channel:`, use that type directly 1. **Explicit prefix** - If address starts with `group:`, `queue:`, or `channel:`, use that type directly
2. **Contains `/`** - Treat as agent address or pattern (direct delivery) 2. **Contains `/`** - Treat as agent address or pattern (direct delivery)
3. **Starts with `@`** - Check for beads group, then fall back to built-in patterns 3. **Starts with `@`** - Special pattern (`@town`, `@crew`, etc.) or beads-native group
4. **Name lookup** - Search in order: group → queue → channel 4. **Name lookup** - Search for group → queue → channel by name
### Address Formats If a name matches multiple types (e.g., both a group and a channel named "alerts"), the resolver returns an error and requires an explicit prefix.
| Format | Type | Example | ## Key Implementation Files
|--------|------|---------|
| `group:<name>` | Group | `group:ops-team` |
| `queue:<name>` | Queue | `queue:build-queue` |
| `channel:<name>` | Channel | `channel:alerts` |
| `<town>/<role>/<name>` | Agent | `gastown/crew/max` |
| `<town>/<role>` | Agent | `gastown/witness` |
| `*/<role>` | Pattern | `*/witness` (all witnesses) |
| `@<name>` | Group/Pattern | `@ops-team` |
### Conflict Handling
If a name matches multiple types (e.g., both a group and a channel named "alerts"), the resolver returns an error requiring an explicit prefix:
```
ambiguous address "alerts": matches multiple types. Use explicit prefix: group:alerts, channel:alerts
```
## Key Files
| File | Description | | File | Description |
|------|-------------| |------|-------------|
| `internal/beads/beads_group.go` | Group bead CRUD operations | | `internal/beads/beads_group.go` | Group bead CRUD operations |
| `internal/beads/beads_queue.go` | Queue bead CRUD operations | | `internal/beads/beads_queue.go` | Queue bead CRUD operations |
| `internal/beads/beads_channel.go` | Channel bead CRUD + retention | | `internal/beads/beads_channel.go` | Channel bead + retention logic |
| `internal/mail/resolve.go` | Address resolution logic | | `internal/mail/resolve.go` | Address resolution logic |
| `internal/cmd/mail_group.go` | Group CLI commands | | `internal/cmd/mail_group.go` | Group CLI commands |
| `internal/cmd/mail_channel.go` | Channel CLI commands | | `internal/cmd/mail_channel.go` | Channel CLI commands |
| `internal/cmd/mail_send.go` | Send command with resolver | | `internal/cmd/mail_send.go` | Updated send with resolver |
## Retention Policy
Channels support two retention mechanisms:
- **Count-based** (`--retain-count=N`): Keep only the last N messages
- **Time-based** (`--retain-hours=N`): Delete messages older than N hours
Retention is enforced:
1. **On-write**: After posting a new message, old messages are pruned
2. **On-patrol**: Deacon patrol runs `PruneAllChannels()` as a backup cleanup
The patrol uses a 10% buffer to avoid thrashing (only prunes if count > retainCount × 1.1).
## Examples ## Examples
### Create a Team Distribution Group ### Create a team distribution group
```bash ```bash
# Create group # Create a group for the ops team
gt mail group create dev-team gastown/crew/max gastown/crew/dennis gt mail group create ops-team gastown/witness gastown/crew/max deacon/
# Add another member # Send to the group
gt mail group add dev-team gastown/crew/george gt mail send ops-team -s "Team meeting" -m "Tomorrow at 10am"
# Send to entire team # Add a new member
gt mail send dev-team -s "Standup" -m "Daily standup in 5 minutes" gt mail group add ops-team gastown/crew/dennis
``` ```
### Set Up a Build Alert Channel ### Set up an alerts channel
```bash ```bash
# Create channel with retention # Create an alerts channel that keeps last 50 messages
gt mail channel create build-alerts --retain-count=50 gt mail channel create alerts --retain-count=50
# Send build notifications # Send an alert
gt mail send channel:build-alerts -s "Build #123 passed" -m "All tests green" gt mail send channel:alerts -s "Build failed" -m "See CI for details"
# View channel history # View recent alerts
gt mail channel build-alerts gt mail channel alerts
``` ```
### Nested Groups ### Create nested groups
```bash ```bash
# Create base groups # Create role-based groups
gt mail group create witnesses gastown/witness ranchero/witness gt mail group create witnesses */witness
gt mail group create crew gastown/crew/max gastown/crew/dennis gt mail group create leads gastown/crew/max gastown/crew/dennis
# Create umbrella group that includes other groups # Create a group that includes other groups
gt mail group create all-agents witnesses crew deacon/ gt mail group create all-hands witnesses leads mayor/
# Send to everyone
gt mail send all-agents -s "Town meeting" -m "All hands meeting at noon"
``` ```

View File

@@ -8,6 +8,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/steveyegge/gastown/internal/beads"
"github.com/steveyegge/gastown/internal/config" "github.com/steveyegge/gastown/internal/config"
"github.com/steveyegge/gastown/internal/session" "github.com/steveyegge/gastown/internal/session"
"github.com/steveyegge/gastown/internal/tmux" "github.com/steveyegge/gastown/internal/tmux"
@@ -85,6 +86,16 @@ func parseAnnounceName(address string) string {
return strings.TrimPrefix(address, "announce:") return strings.TrimPrefix(address, "announce:")
} }
// isChannelAddress returns true if the address uses channel:name syntax (beads-native channels).
func isChannelAddress(address string) bool {
return strings.HasPrefix(address, "channel:")
}
// parseChannelName extracts the channel name from a channel:name address.
func parseChannelName(address string) string {
return strings.TrimPrefix(address, "channel:")
}
// expandFromConfig is a generic helper for config-based expansion. // expandFromConfig is a generic helper for config-based expansion.
// It loads the messaging config and calls the getter to extract the desired value. // It loads the messaging config and calls the getter to extract the desired value.
// This consolidates the common pattern of: check townRoot, load config, lookup in map. // This consolidates the common pattern of: check townRoot, load config, lookup in map.
@@ -515,6 +526,11 @@ func (r *Router) Send(msg *Message) error {
return r.sendToAnnounce(msg) return r.sendToAnnounce(msg)
} }
// Check for beads-native channel address - broadcast with retention
if isChannelAddress(msg.To) {
return r.sendToChannel(msg)
}
// Check for @group address - resolve and fan-out // Check for @group address - resolve and fan-out
if isGroupAddress(msg.To) { if isGroupAddress(msg.To) {
return r.sendToGroup(msg) return r.sendToGroup(msg)
@@ -795,6 +811,78 @@ func (r *Router) sendToAnnounce(msg *Message) error {
return nil return nil
} }
// sendToChannel delivers a message to a beads-native channel.
// Creates a message with channel:<name> label for channel queries.
// Retention is enforced by the channel's EnforceChannelRetention after message creation.
func (r *Router) sendToChannel(msg *Message) error {
channelName := parseChannelName(msg.To)
// Validate channel exists as a beads-native channel
if r.townRoot == "" {
return fmt.Errorf("town root not set, cannot send to channel: %s", channelName)
}
b := beads.New(r.townRoot)
_, fields, err := b.GetChannelBead(channelName)
if err != nil {
return fmt.Errorf("getting channel %s: %w", channelName, err)
}
if fields == nil {
return fmt.Errorf("channel not found: %s", channelName)
}
// Build labels for from/thread/reply-to/cc plus channel metadata
var labels []string
labels = append(labels, "from:"+msg.From)
labels = append(labels, "channel:"+channelName)
if msg.ThreadID != "" {
labels = append(labels, "thread:"+msg.ThreadID)
}
if msg.ReplyTo != "" {
labels = append(labels, "reply-to:"+msg.ReplyTo)
}
for _, cc := range msg.CC {
ccIdentity := addressToIdentity(cc)
labels = append(labels, "cc:"+ccIdentity)
}
// Build command: bd create <subject> --type=message --assignee=channel:<name> -d <body>
// Use channel:<name> as assignee so queries can filter by channel
args := []string{"create", msg.Subject,
"--type", "message",
"--assignee", msg.To, // channel:name
"-d", msg.Body,
}
// Add priority flag
beadsPriority := PriorityToBeads(msg.Priority)
args = append(args, "--priority", fmt.Sprintf("%d", beadsPriority))
// Add labels (includes channel name for filtering)
if len(labels) > 0 {
args = append(args, "--labels", strings.Join(labels, ","))
}
// Add actor for attribution (sender identity)
args = append(args, "--actor", msg.From)
// Channel messages are never ephemeral - they persist according to retention policy
// (deliberately not checking shouldBeWisp)
// Channel messages go to town-level beads (shared location)
beadsDir := r.resolveBeadsDir("")
_, err = runBdCommand(args, filepath.Dir(beadsDir), beadsDir)
if err != nil {
return fmt.Errorf("sending to channel %s: %w", channelName, err)
}
// Enforce channel retention policy (on-write cleanup)
_ = b.EnforceChannelRetention(channelName)
// No notification for channel messages - readers poll or check on their own schedule
return nil
}
// pruneAnnounce deletes oldest messages from an announce channel to enforce retention. // pruneAnnounce deletes oldest messages from an announce channel to enforce retention.
// If the channel has >= retainCount messages, deletes the oldest until count < retainCount. // If the channel has >= retainCount messages, deletes the oldest until count < retainCount.
func (r *Router) pruneAnnounce(announceName string, retainCount int) error { func (r *Router) pruneAnnounce(announceName string, retainCount int) error {