From 012d50b2b26cb31f62cd6369514020850bbc186c Mon Sep 17 00:00:00 2001 From: gastown/crew/max Date: Wed, 14 Jan 2026 21:23:34 -0800 Subject: [PATCH] feat(beads): implement channel message retention Add two-layer retention for beads-native channel messages: 1. On-write cleanup (EnforceChannelRetention): - Called after posting to channel - Deletes oldest messages when count > retainCount 2. Deacon patrol backup (PruneAllChannels): - Scans all channels periodically - Uses 10% buffer to avoid thrashing - Catches edge cases: crashed mid-write, manual insertions Part of gt-xfqh1e.13 (channel retention task). Co-Authored-By: Claude Opus 4.5 --- internal/beads/beads_channel.go | 104 ++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/internal/beads/beads_channel.go b/internal/beads/beads_channel.go index 83a6b7f5..db8cc24f 100644 --- a/internal/beads/beads_channel.go +++ b/internal/beads/beads_channel.go @@ -375,3 +375,107 @@ func (b *Beads) LookupChannelByName(name string) (*Issue, *ChannelFields, error) return nil, nil, nil // Not found } + +// EnforceChannelRetention prunes old messages from a channel to enforce retention. +// Called after posting a new message to the channel (on-write cleanup). +// If channel has >= retainCount messages, deletes oldest until count < retainCount. +func (b *Beads) EnforceChannelRetention(name string) error { + // Get channel config + _, fields, err := b.GetChannelBead(name) + if err != nil { + return err + } + if fields == nil { + return fmt.Errorf("channel not found: %s", name) + } + + // Skip if no retention limit + if fields.RetentionCount <= 0 { + return nil + } + + // Query messages in this channel (oldest first) + out, err := b.run("list", + "--type=message", + "--label=channel:"+name, + "--json", + "--limit=0", + "--sort=created", + ) + if err != nil { + return fmt.Errorf("listing channel messages: %w", err) + } + + var messages []struct { + ID string `json:"id"` + } + if err := json.Unmarshal(out, &messages); err != nil { + return fmt.Errorf("parsing channel messages: %w", err) + } + + // Calculate how many to delete + // We're being called after a new message is posted, so we want to end up with retainCount + toDelete := len(messages) - fields.RetentionCount + if toDelete <= 0 { + return nil // No pruning needed + } + + // Delete oldest messages (best-effort) + for i := 0; i < toDelete && i < len(messages); i++ { + // Use close instead of delete for audit trail + _, _ = b.run("close", messages[i].ID, "--reason=channel retention pruning") + } + + return nil +} + +// PruneAllChannels enforces retention on all channels. +// Called by Deacon patrol as a backup cleanup mechanism. +// Uses a 10% buffer to avoid thrashing (only prunes if count > retainCount * 1.1). +func (b *Beads) PruneAllChannels() (int, error) { + channels, err := b.ListChannelBeads() + if err != nil { + return 0, err + } + + pruned := 0 + for name, fields := range channels { + if fields.RetentionCount <= 0 { + continue + } + + // Count messages + out, err := b.run("list", + "--type=message", + "--label=channel:"+name, + "--json", + "--limit=0", + ) + if err != nil { + continue // Skip on error + } + + var messages []struct { + ID string `json:"id"` + } + if err := json.Unmarshal(out, &messages); err != nil { + continue + } + + // 10% buffer - only prune if significantly over limit + threshold := int(float64(fields.RetentionCount) * 1.1) + if len(messages) <= threshold { + continue + } + + // Prune down to exactly retainCount + toDelete := len(messages) - fields.RetentionCount + for i := 0; i < toDelete && i < len(messages); i++ { + if _, err := b.run("close", messages[i].ID, "--reason=patrol retention pruning"); err == nil { + pruned++ + } + } + } + + return pruned, nil +}