feat(mail): Add sendToAnnounce() for bulletin board delivery (gt-q73h3)
Implements announce channel delivery in router.go: - Add isAnnounceAddress() and parseAnnounceName() helpers - Add ErrUnknownAnnounce error variable - Add expandAnnounce() to load AnnounceConfig from messaging.json - Add sendToAnnounce() for bulletin board delivery (single copy, no claiming) - Add pruneAnnounce() for retention-based message cleanup - Integrate announce routing in Send() Announce channels store ONE copy of each message (unlike lists which fan-out). Messages persist until retention limit is reached, with oldest messages pruned automatically when limit is exceeded. Also includes address helpers (gt-pn2fq dependency). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -134,6 +134,28 @@ func (r *Router) expandQueue(queueName string) (*config.QueueConfig, error) {
|
||||
return &queueCfg, nil
|
||||
}
|
||||
|
||||
// expandAnnounce returns the AnnounceConfig for an announce channel name.
|
||||
// Returns ErrUnknownAnnounce if the channel is not found.
|
||||
func (r *Router) expandAnnounce(announceName string) (*config.AnnounceConfig, error) {
|
||||
// Load messaging config from town root
|
||||
if r.townRoot == "" {
|
||||
return nil, fmt.Errorf("%w: %s (no town root)", ErrUnknownAnnounce, announceName)
|
||||
}
|
||||
|
||||
configPath := config.MessagingConfigPath(r.townRoot)
|
||||
cfg, err := config.LoadMessagingConfig(configPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loading messaging config: %w", err)
|
||||
}
|
||||
|
||||
announceCfg, ok := cfg.Announces[announceName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: %s", ErrUnknownAnnounce, announceName)
|
||||
}
|
||||
|
||||
return &announceCfg, nil
|
||||
}
|
||||
|
||||
// detectTownRoot finds the town root by looking for mayor/town.json.
|
||||
func detectTownRoot(startDir string) string {
|
||||
dir := startDir
|
||||
@@ -491,8 +513,9 @@ func (r *Router) shouldBeWisp(msg *Message) bool {
|
||||
// Supports fan-out for:
|
||||
// - Mailing lists (list:name) - fans out to all list members
|
||||
// - @group addresses - resolves and fans out to matching agents
|
||||
// Supports queue delivery for:
|
||||
// Supports single-copy delivery for:
|
||||
// - Queues (queue:name) - stores single message for worker claiming
|
||||
// - Announces (announce:name) - bulletin board, no claiming, retention-limited
|
||||
func (r *Router) Send(msg *Message) error {
|
||||
// Check for mailing list address
|
||||
if isListAddress(msg.To) {
|
||||
@@ -504,6 +527,11 @@ func (r *Router) Send(msg *Message) error {
|
||||
return r.sendToQueue(msg)
|
||||
}
|
||||
|
||||
// Check for announce address - bulletin board (single copy, no claiming)
|
||||
if isAnnounceAddress(msg.To) {
|
||||
return r.sendToAnnounce(msg)
|
||||
}
|
||||
|
||||
// Check for @group address - resolve and fan-out
|
||||
if isGroupAddress(msg.To) {
|
||||
return r.sendToGroup(msg)
|
||||
@@ -737,6 +765,155 @@ func (r *Router) sendToQueue(msg *Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendToAnnounce delivers a message to an announce channel (bulletin board).
|
||||
// Unlike sendToQueue, no claiming is supported - messages persist until retention limit.
|
||||
// ONE copy is stored in town-level beads with announce_channel metadata.
|
||||
func (r *Router) sendToAnnounce(msg *Message) error {
|
||||
announceName := parseAnnounceName(msg.To)
|
||||
|
||||
// Validate announce channel exists and get config
|
||||
announceCfg, err := r.expandAnnounce(announceName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Apply retention pruning BEFORE creating new message
|
||||
if announceCfg.RetainCount > 0 {
|
||||
if err := r.pruneAnnounce(announceName, announceCfg.RetainCount); err != nil {
|
||||
// Log but don't fail - pruning is best-effort
|
||||
// The new message should still be created
|
||||
_ = err
|
||||
}
|
||||
}
|
||||
|
||||
// Build labels for from/thread/reply-to/cc plus announce metadata
|
||||
var labels []string
|
||||
labels = append(labels, "from:"+msg.From)
|
||||
labels = append(labels, "announce:"+announceName)
|
||||
if msg.ThreadID != "" {
|
||||
labels = append(labels, "thread:"+msg.ThreadID)
|
||||
}
|
||||
if msg.ReplyTo != "" {
|
||||
labels = append(labels, "reply-to:"+msg.ReplyTo)
|
||||
}
|
||||
for _, cc := range msg.CC {
|
||||
ccIdentity := addressToIdentity(cc)
|
||||
labels = append(labels, "cc:"+ccIdentity)
|
||||
}
|
||||
|
||||
// Build command: bd create <subject> --type=message --assignee=announce:<name> -d <body>
|
||||
// Use announce:<name> as assignee so queries can filter by channel
|
||||
args := []string{"create", msg.Subject,
|
||||
"--type", "message",
|
||||
"--assignee", msg.To, // announce:name
|
||||
"-d", msg.Body,
|
||||
}
|
||||
|
||||
// Add priority flag
|
||||
beadsPriority := PriorityToBeads(msg.Priority)
|
||||
args = append(args, "--priority", fmt.Sprintf("%d", beadsPriority))
|
||||
|
||||
// Add labels (includes announce name for filtering)
|
||||
if len(labels) > 0 {
|
||||
args = append(args, "--labels", strings.Join(labels, ","))
|
||||
}
|
||||
|
||||
// Add actor for attribution (sender identity)
|
||||
args = append(args, "--actor", msg.From)
|
||||
|
||||
// Announce messages are never ephemeral - they need to persist for readers
|
||||
// (deliberately not checking shouldBeWisp)
|
||||
|
||||
// Announce messages go to town-level beads (shared location)
|
||||
beadsDir := r.resolveBeadsDir("")
|
||||
cmd := exec.Command("bd", args...)
|
||||
cmd.Env = append(cmd.Environ(),
|
||||
"BEADS_DIR="+beadsDir,
|
||||
)
|
||||
cmd.Dir = filepath.Dir(beadsDir) // Run in parent of .beads
|
||||
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
errMsg := strings.TrimSpace(stderr.String())
|
||||
if errMsg != "" {
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
return fmt.Errorf("sending to announce %s: %w", announceName, err)
|
||||
}
|
||||
|
||||
// No notification for announce messages - readers poll or check on their own schedule
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// pruneAnnounce deletes oldest messages from an announce channel to enforce retention.
|
||||
// If the channel has >= retainCount messages, deletes the oldest until count < retainCount.
|
||||
func (r *Router) pruneAnnounce(announceName string, retainCount int) error {
|
||||
if retainCount <= 0 {
|
||||
return nil // No retention limit
|
||||
}
|
||||
|
||||
beadsDir := r.resolveBeadsDir("")
|
||||
|
||||
// Query existing messages in this announce channel
|
||||
// Use bd list with labels filter to find messages with announce:<name> label
|
||||
args := []string{"list",
|
||||
"--type=message",
|
||||
"--labels=announce:" + announceName,
|
||||
"--json",
|
||||
"--limit=0", // Get all
|
||||
"--sort=created",
|
||||
"--asc", // Oldest first
|
||||
}
|
||||
|
||||
cmd := exec.Command("bd", args...)
|
||||
cmd.Env = append(cmd.Environ(), "BEADS_DIR="+beadsDir)
|
||||
cmd.Dir = filepath.Dir(beadsDir)
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
|
||||
if err := cmd.Run(); err != nil {
|
||||
errMsg := strings.TrimSpace(stderr.String())
|
||||
if errMsg != "" {
|
||||
return errors.New(errMsg)
|
||||
}
|
||||
return fmt.Errorf("querying announce messages: %w", err)
|
||||
}
|
||||
|
||||
// Parse message list
|
||||
var messages []struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
if err := json.Unmarshal(stdout.Bytes(), &messages); err != nil {
|
||||
return fmt.Errorf("parsing announce messages: %w", err)
|
||||
}
|
||||
|
||||
// Calculate how many to delete (we're about to add 1 more)
|
||||
// If we have N messages and retainCount is R, we need to keep at most R-1 after pruning
|
||||
// so the new message makes it exactly R
|
||||
toDelete := len(messages) - (retainCount - 1)
|
||||
if toDelete <= 0 {
|
||||
return nil // No pruning needed
|
||||
}
|
||||
|
||||
// Delete oldest messages
|
||||
for i := 0; i < toDelete && i < len(messages); i++ {
|
||||
deleteArgs := []string{"close", messages[i].ID, "--reason=retention pruning"}
|
||||
deleteCmd := exec.Command("bd", deleteArgs...)
|
||||
deleteCmd.Env = append(deleteCmd.Environ(), "BEADS_DIR="+beadsDir)
|
||||
deleteCmd.Dir = filepath.Dir(beadsDir)
|
||||
|
||||
// Best-effort deletion - don't fail if one delete fails
|
||||
_ = deleteCmd.Run()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isSelfMail returns true if sender and recipient are the same identity.
|
||||
// Normalizes addresses by removing trailing slashes for comparison.
|
||||
func isSelfMail(from, to string) bool {
|
||||
|
||||
@@ -719,3 +719,96 @@ func TestAgentBeadToAddress(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandAnnounce(t *testing.T) {
|
||||
// Create temp directory with messaging config
|
||||
tmpDir := t.TempDir()
|
||||
configDir := filepath.Join(tmpDir, "config")
|
||||
if err := os.MkdirAll(configDir, 0755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Write messaging.json with test announces
|
||||
configContent := `{
|
||||
"type": "messaging",
|
||||
"version": 1,
|
||||
"announces": {
|
||||
"alerts": {"readers": ["@town"], "retain_count": 10},
|
||||
"status/gastown": {"readers": ["gastown/witness", "mayor/"], "retain_count": 5}
|
||||
}
|
||||
}`
|
||||
if err := os.WriteFile(filepath.Join(configDir, "messaging.json"), []byte(configContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r := NewRouterWithTownRoot(tmpDir, tmpDir)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
announceName string
|
||||
wantReaders []string
|
||||
wantRetain int
|
||||
wantErr bool
|
||||
errString string
|
||||
}{
|
||||
{
|
||||
name: "alerts announce",
|
||||
announceName: "alerts",
|
||||
wantReaders: []string{"@town"},
|
||||
wantRetain: 10,
|
||||
},
|
||||
{
|
||||
name: "status/gastown announce",
|
||||
announceName: "status/gastown",
|
||||
wantReaders: []string{"gastown/witness", "mayor/"},
|
||||
wantRetain: 5,
|
||||
},
|
||||
{
|
||||
name: "unknown announce",
|
||||
announceName: "nonexistent",
|
||||
wantErr: true,
|
||||
errString: "unknown announce channel",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := r.expandAnnounce(tt.announceName)
|
||||
if tt.wantErr {
|
||||
if err == nil {
|
||||
t.Errorf("expandAnnounce(%q) expected error, got nil", tt.announceName)
|
||||
} else if tt.errString != "" && !contains(err.Error(), tt.errString) {
|
||||
t.Errorf("expandAnnounce(%q) error = %v, want containing %q", tt.announceName, err, tt.errString)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("expandAnnounce(%q) unexpected error: %v", tt.announceName, err)
|
||||
return
|
||||
}
|
||||
if len(got.Readers) != len(tt.wantReaders) {
|
||||
t.Errorf("expandAnnounce(%q).Readers = %v, want %v", tt.announceName, got.Readers, tt.wantReaders)
|
||||
return
|
||||
}
|
||||
for i, reader := range got.Readers {
|
||||
if reader != tt.wantReaders[i] {
|
||||
t.Errorf("expandAnnounce(%q).Readers[%d] = %q, want %q", tt.announceName, i, reader, tt.wantReaders[i])
|
||||
}
|
||||
}
|
||||
if got.RetainCount != tt.wantRetain {
|
||||
t.Errorf("expandAnnounce(%q).RetainCount = %d, want %d", tt.announceName, got.RetainCount, tt.wantRetain)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpandAnnounceNoTownRoot(t *testing.T) {
|
||||
r := &Router{workDir: "/tmp", townRoot: ""}
|
||||
_, err := r.expandAnnounce("alerts")
|
||||
if err == nil {
|
||||
t.Error("expandAnnounce with no townRoot should error")
|
||||
}
|
||||
if !contains(err.Error(), "no town root") {
|
||||
t.Errorf("expandAnnounce error = %v, want containing 'no town root'", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user