diff --git a/internal/mail/router.go b/internal/mail/router.go index 8ba19a20..60112ae3 100644 --- a/internal/mail/router.go +++ b/internal/mail/router.go @@ -134,6 +134,28 @@ func (r *Router) expandQueue(queueName string) (*config.QueueConfig, error) { return &queueCfg, nil } +// expandAnnounce returns the AnnounceConfig for an announce channel name. +// Returns ErrUnknownAnnounce if the channel is not found. +func (r *Router) expandAnnounce(announceName string) (*config.AnnounceConfig, error) { + // Load messaging config from town root + if r.townRoot == "" { + return nil, fmt.Errorf("%w: %s (no town root)", ErrUnknownAnnounce, announceName) + } + + configPath := config.MessagingConfigPath(r.townRoot) + cfg, err := config.LoadMessagingConfig(configPath) + if err != nil { + return nil, fmt.Errorf("loading messaging config: %w", err) + } + + announceCfg, ok := cfg.Announces[announceName] + if !ok { + return nil, fmt.Errorf("%w: %s", ErrUnknownAnnounce, announceName) + } + + return &announceCfg, nil +} + // detectTownRoot finds the town root by looking for mayor/town.json. func detectTownRoot(startDir string) string { dir := startDir @@ -491,8 +513,9 @@ func (r *Router) shouldBeWisp(msg *Message) bool { // Supports fan-out for: // - Mailing lists (list:name) - fans out to all list members // - @group addresses - resolves and fans out to matching agents -// Supports queue delivery for: +// Supports single-copy delivery for: // - Queues (queue:name) - stores single message for worker claiming +// - Announces (announce:name) - bulletin board, no claiming, retention-limited func (r *Router) Send(msg *Message) error { // Check for mailing list address if isListAddress(msg.To) { @@ -504,6 +527,11 @@ func (r *Router) Send(msg *Message) error { return r.sendToQueue(msg) } + // Check for announce address - bulletin board (single copy, no claiming) + if isAnnounceAddress(msg.To) { + return r.sendToAnnounce(msg) + } + // Check for @group address - resolve and fan-out if isGroupAddress(msg.To) { return r.sendToGroup(msg) @@ -737,6 +765,155 @@ func (r *Router) sendToQueue(msg *Message) error { return nil } +// sendToAnnounce delivers a message to an announce channel (bulletin board). +// Unlike sendToQueue, no claiming is supported - messages persist until retention limit. +// ONE copy is stored in town-level beads with announce_channel metadata. +func (r *Router) sendToAnnounce(msg *Message) error { + announceName := parseAnnounceName(msg.To) + + // Validate announce channel exists and get config + announceCfg, err := r.expandAnnounce(announceName) + if err != nil { + return err + } + + // Apply retention pruning BEFORE creating new message + if announceCfg.RetainCount > 0 { + if err := r.pruneAnnounce(announceName, announceCfg.RetainCount); err != nil { + // Log but don't fail - pruning is best-effort + // The new message should still be created + _ = err + } + } + + // Build labels for from/thread/reply-to/cc plus announce metadata + var labels []string + labels = append(labels, "from:"+msg.From) + labels = append(labels, "announce:"+announceName) + if msg.ThreadID != "" { + labels = append(labels, "thread:"+msg.ThreadID) + } + if msg.ReplyTo != "" { + labels = append(labels, "reply-to:"+msg.ReplyTo) + } + for _, cc := range msg.CC { + ccIdentity := addressToIdentity(cc) + labels = append(labels, "cc:"+ccIdentity) + } + + // Build command: bd create --type=message --assignee=announce: -d + // Use announce: as assignee so queries can filter by channel + args := []string{"create", msg.Subject, + "--type", "message", + "--assignee", msg.To, // announce:name + "-d", msg.Body, + } + + // Add priority flag + beadsPriority := PriorityToBeads(msg.Priority) + args = append(args, "--priority", fmt.Sprintf("%d", beadsPriority)) + + // Add labels (includes announce name for filtering) + if len(labels) > 0 { + args = append(args, "--labels", strings.Join(labels, ",")) + } + + // Add actor for attribution (sender identity) + args = append(args, "--actor", msg.From) + + // Announce messages are never ephemeral - they need to persist for readers + // (deliberately not checking shouldBeWisp) + + // Announce messages go to town-level beads (shared location) + beadsDir := r.resolveBeadsDir("") + cmd := exec.Command("bd", args...) + cmd.Env = append(cmd.Environ(), + "BEADS_DIR="+beadsDir, + ) + cmd.Dir = filepath.Dir(beadsDir) // Run in parent of .beads + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + errMsg := strings.TrimSpace(stderr.String()) + if errMsg != "" { + return errors.New(errMsg) + } + return fmt.Errorf("sending to announce %s: %w", announceName, err) + } + + // No notification for announce messages - readers poll or check on their own schedule + + return nil +} + +// pruneAnnounce deletes oldest messages from an announce channel to enforce retention. +// If the channel has >= retainCount messages, deletes the oldest until count < retainCount. +func (r *Router) pruneAnnounce(announceName string, retainCount int) error { + if retainCount <= 0 { + return nil // No retention limit + } + + beadsDir := r.resolveBeadsDir("") + + // Query existing messages in this announce channel + // Use bd list with labels filter to find messages with announce: label + args := []string{"list", + "--type=message", + "--labels=announce:" + announceName, + "--json", + "--limit=0", // Get all + "--sort=created", + "--asc", // Oldest first + } + + cmd := exec.Command("bd", args...) + cmd.Env = append(cmd.Environ(), "BEADS_DIR="+beadsDir) + cmd.Dir = filepath.Dir(beadsDir) + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + errMsg := strings.TrimSpace(stderr.String()) + if errMsg != "" { + return errors.New(errMsg) + } + return fmt.Errorf("querying announce messages: %w", err) + } + + // Parse message list + var messages []struct { + ID string `json:"id"` + } + if err := json.Unmarshal(stdout.Bytes(), &messages); err != nil { + return fmt.Errorf("parsing announce messages: %w", err) + } + + // Calculate how many to delete (we're about to add 1 more) + // If we have N messages and retainCount is R, we need to keep at most R-1 after pruning + // so the new message makes it exactly R + toDelete := len(messages) - (retainCount - 1) + if toDelete <= 0 { + return nil // No pruning needed + } + + // Delete oldest messages + for i := 0; i < toDelete && i < len(messages); i++ { + deleteArgs := []string{"close", messages[i].ID, "--reason=retention pruning"} + deleteCmd := exec.Command("bd", deleteArgs...) + deleteCmd.Env = append(deleteCmd.Environ(), "BEADS_DIR="+beadsDir) + deleteCmd.Dir = filepath.Dir(beadsDir) + + // Best-effort deletion - don't fail if one delete fails + _ = deleteCmd.Run() + } + + return nil +} + // isSelfMail returns true if sender and recipient are the same identity. // Normalizes addresses by removing trailing slashes for comparison. func isSelfMail(from, to string) bool { diff --git a/internal/mail/router_test.go b/internal/mail/router_test.go index 494ec257..425485b7 100644 --- a/internal/mail/router_test.go +++ b/internal/mail/router_test.go @@ -719,3 +719,96 @@ func TestAgentBeadToAddress(t *testing.T) { }) } } + +func TestExpandAnnounce(t *testing.T) { + // Create temp directory with messaging config + tmpDir := t.TempDir() + configDir := filepath.Join(tmpDir, "config") + if err := os.MkdirAll(configDir, 0755); err != nil { + t.Fatal(err) + } + + // Write messaging.json with test announces + configContent := `{ + "type": "messaging", + "version": 1, + "announces": { + "alerts": {"readers": ["@town"], "retain_count": 10}, + "status/gastown": {"readers": ["gastown/witness", "mayor/"], "retain_count": 5} + } +}` + if err := os.WriteFile(filepath.Join(configDir, "messaging.json"), []byte(configContent), 0644); err != nil { + t.Fatal(err) + } + + r := NewRouterWithTownRoot(tmpDir, tmpDir) + + tests := []struct { + name string + announceName string + wantReaders []string + wantRetain int + wantErr bool + errString string + }{ + { + name: "alerts announce", + announceName: "alerts", + wantReaders: []string{"@town"}, + wantRetain: 10, + }, + { + name: "status/gastown announce", + announceName: "status/gastown", + wantReaders: []string{"gastown/witness", "mayor/"}, + wantRetain: 5, + }, + { + name: "unknown announce", + announceName: "nonexistent", + wantErr: true, + errString: "unknown announce channel", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := r.expandAnnounce(tt.announceName) + if tt.wantErr { + if err == nil { + t.Errorf("expandAnnounce(%q) expected error, got nil", tt.announceName) + } else if tt.errString != "" && !contains(err.Error(), tt.errString) { + t.Errorf("expandAnnounce(%q) error = %v, want containing %q", tt.announceName, err, tt.errString) + } + return + } + if err != nil { + t.Errorf("expandAnnounce(%q) unexpected error: %v", tt.announceName, err) + return + } + if len(got.Readers) != len(tt.wantReaders) { + t.Errorf("expandAnnounce(%q).Readers = %v, want %v", tt.announceName, got.Readers, tt.wantReaders) + return + } + for i, reader := range got.Readers { + if reader != tt.wantReaders[i] { + t.Errorf("expandAnnounce(%q).Readers[%d] = %q, want %q", tt.announceName, i, reader, tt.wantReaders[i]) + } + } + if got.RetainCount != tt.wantRetain { + t.Errorf("expandAnnounce(%q).RetainCount = %d, want %d", tt.announceName, got.RetainCount, tt.wantRetain) + } + }) + } +} + +func TestExpandAnnounceNoTownRoot(t *testing.T) { + r := &Router{workDir: "/tmp", townRoot: ""} + _, err := r.expandAnnounce("alerts") + if err == nil { + t.Error("expandAnnounce with no townRoot should error") + } + if !contains(err.Error(), "no town root") { + t.Errorf("expandAnnounce error = %v, want containing 'no town root'", err) + } +}