diff --git a/internal/beads/beads_channel.go b/internal/beads/beads_channel.go new file mode 100644 index 00000000..83a6b7f5 --- /dev/null +++ b/internal/beads/beads_channel.go @@ -0,0 +1,377 @@ +// Package beads provides channel bead management for beads-native messaging. +// Channels are named pub/sub streams where messages are broadcast to subscribers. +package beads + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "strconv" + "strings" +) + +// ChannelFields holds structured fields for channel beads. +// These are stored as "key: value" lines in the description. +type ChannelFields struct { + Name string // Unique channel name (e.g., "alerts", "builds") + Subscribers []string // Addresses subscribed to this channel + Status string // active, closed + RetentionCount int // Number of recent messages to retain (0 = unlimited) + RetentionHours int // Hours to retain messages (0 = forever) + CreatedBy string // Who created the channel + CreatedAt string // ISO 8601 timestamp +} + +// Channel status constants +const ( + ChannelStatusActive = "active" + ChannelStatusClosed = "closed" +) + +// FormatChannelDescription creates a description string from channel fields. +func FormatChannelDescription(title string, fields *ChannelFields) string { + if fields == nil { + return title + } + + var lines []string + lines = append(lines, title) + lines = append(lines, "") + lines = append(lines, fmt.Sprintf("name: %s", fields.Name)) + + // Subscribers stored as comma-separated list + if len(fields.Subscribers) > 0 { + lines = append(lines, fmt.Sprintf("subscribers: %s", strings.Join(fields.Subscribers, ","))) + } else { + lines = append(lines, "subscribers: null") + } + + if fields.Status != "" { + lines = append(lines, fmt.Sprintf("status: %s", fields.Status)) + } else { + lines = append(lines, "status: active") + } + + lines = append(lines, fmt.Sprintf("retention_count: %d", fields.RetentionCount)) + lines = append(lines, fmt.Sprintf("retention_hours: %d", fields.RetentionHours)) + + if fields.CreatedBy != "" { + lines = append(lines, fmt.Sprintf("created_by: %s", fields.CreatedBy)) + } else { + lines = append(lines, "created_by: null") + } + + if fields.CreatedAt != "" { + lines = append(lines, fmt.Sprintf("created_at: %s", fields.CreatedAt)) + } else { + lines = append(lines, "created_at: null") + } + + return strings.Join(lines, "\n") +} + +// ParseChannelFields extracts channel fields from an issue's description. +func ParseChannelFields(description string) *ChannelFields { + fields := &ChannelFields{ + Status: ChannelStatusActive, + } + + for _, line := range strings.Split(description, "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + colonIdx := strings.Index(line, ":") + if colonIdx == -1 { + continue + } + + key := strings.TrimSpace(line[:colonIdx]) + value := strings.TrimSpace(line[colonIdx+1:]) + if value == "null" || value == "" { + value = "" + } + + switch strings.ToLower(key) { + case "name": + fields.Name = value + case "subscribers": + if value != "" { + // Parse comma-separated subscribers + for _, s := range strings.Split(value, ",") { + s = strings.TrimSpace(s) + if s != "" { + fields.Subscribers = append(fields.Subscribers, s) + } + } + } + case "status": + fields.Status = value + case "retention_count": + if v, err := strconv.Atoi(value); err == nil { + fields.RetentionCount = v + } + case "retention_hours": + if v, err := strconv.Atoi(value); err == nil { + fields.RetentionHours = v + } + case "created_by": + fields.CreatedBy = value + case "created_at": + fields.CreatedAt = value + } + } + + return fields +} + +// ChannelBeadID returns the bead ID for a channel name. +// Format: gt-channel- +func ChannelBeadID(name string) string { + return "gt-channel-" + name +} + +// CreateChannelBead creates a channel bead for pub/sub messaging. +// The ID format is: gt-channel- (e.g., gt-channel-alerts) +// The created_by field is populated from BD_ACTOR env var for provenance tracking. +func (b *Beads) CreateChannelBead(name string, subscribers []string, createdBy string) (*Issue, error) { + id := ChannelBeadID(name) + title := fmt.Sprintf("Channel: %s", name) + + fields := &ChannelFields{ + Name: name, + Subscribers: subscribers, + Status: ChannelStatusActive, + CreatedBy: createdBy, + } + + description := FormatChannelDescription(title, fields) + + args := []string{"create", "--json", + "--id=" + id, + "--title=" + title, + "--description=" + description, + "--type=task", // Channels use task type with gt:channel label + "--labels=gt:channel", + } + + // Default actor from BD_ACTOR env var for provenance tracking + if actor := os.Getenv("BD_ACTOR"); actor != "" { + args = append(args, "--actor="+actor) + } + + out, err := b.run(args...) + if err != nil { + return nil, err + } + + var issue Issue + if err := json.Unmarshal(out, &issue); err != nil { + return nil, fmt.Errorf("parsing bd create output: %w", err) + } + + return &issue, nil +} + +// GetChannelBead retrieves a channel bead by name. +// Returns nil, nil if not found. +func (b *Beads) GetChannelBead(name string) (*Issue, *ChannelFields, error) { + id := ChannelBeadID(name) + issue, err := b.Show(id) + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, nil, nil + } + return nil, nil, err + } + + if !HasLabel(issue, "gt:channel") { + return nil, nil, fmt.Errorf("bead %s is not a channel bead (missing gt:channel label)", id) + } + + fields := ParseChannelFields(issue.Description) + return issue, fields, nil +} + +// GetChannelByID retrieves a channel bead by its full ID. +// Returns nil, nil if not found. +func (b *Beads) GetChannelByID(id string) (*Issue, *ChannelFields, error) { + issue, err := b.Show(id) + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, nil, nil + } + return nil, nil, err + } + + if !HasLabel(issue, "gt:channel") { + return nil, nil, fmt.Errorf("bead %s is not a channel bead (missing gt:channel label)", id) + } + + fields := ParseChannelFields(issue.Description) + return issue, fields, nil +} + +// UpdateChannelSubscribers updates the subscribers list for a channel. +func (b *Beads) UpdateChannelSubscribers(name string, subscribers []string) error { + issue, fields, err := b.GetChannelBead(name) + if err != nil { + return err + } + if issue == nil { + return fmt.Errorf("channel %q not found", name) + } + + fields.Subscribers = subscribers + description := FormatChannelDescription(issue.Title, fields) + + return b.Update(issue.ID, UpdateOptions{Description: &description}) +} + +// SubscribeToChannel adds a subscriber to a channel if not already subscribed. +func (b *Beads) SubscribeToChannel(name string, subscriber string) error { + issue, fields, err := b.GetChannelBead(name) + if err != nil { + return err + } + if issue == nil { + return fmt.Errorf("channel %q not found", name) + } + + // Check if already subscribed + for _, s := range fields.Subscribers { + if s == subscriber { + return nil // Already subscribed + } + } + + fields.Subscribers = append(fields.Subscribers, subscriber) + description := FormatChannelDescription(issue.Title, fields) + + return b.Update(issue.ID, UpdateOptions{Description: &description}) +} + +// UnsubscribeFromChannel removes a subscriber from a channel. +func (b *Beads) UnsubscribeFromChannel(name string, subscriber string) error { + issue, fields, err := b.GetChannelBead(name) + if err != nil { + return err + } + if issue == nil { + return fmt.Errorf("channel %q not found", name) + } + + // Filter out the subscriber + var newSubscribers []string + for _, s := range fields.Subscribers { + if s != subscriber { + newSubscribers = append(newSubscribers, s) + } + } + + fields.Subscribers = newSubscribers + description := FormatChannelDescription(issue.Title, fields) + + return b.Update(issue.ID, UpdateOptions{Description: &description}) +} + +// UpdateChannelRetention updates the retention policy for a channel. +func (b *Beads) UpdateChannelRetention(name string, retentionCount, retentionHours int) error { + issue, fields, err := b.GetChannelBead(name) + if err != nil { + return err + } + if issue == nil { + return fmt.Errorf("channel %q not found", name) + } + + fields.RetentionCount = retentionCount + fields.RetentionHours = retentionHours + description := FormatChannelDescription(issue.Title, fields) + + return b.Update(issue.ID, UpdateOptions{Description: &description}) +} + +// UpdateChannelStatus updates the status of a channel bead. +func (b *Beads) UpdateChannelStatus(name, status string) error { + // Validate status + if status != ChannelStatusActive && status != ChannelStatusClosed { + return fmt.Errorf("invalid channel status %q: must be active or closed", status) + } + + issue, fields, err := b.GetChannelBead(name) + if err != nil { + return err + } + if issue == nil { + return fmt.Errorf("channel %q not found", name) + } + + fields.Status = status + description := FormatChannelDescription(issue.Title, fields) + + return b.Update(issue.ID, UpdateOptions{Description: &description}) +} + +// DeleteChannelBead permanently deletes a channel bead. +func (b *Beads) DeleteChannelBead(name string) error { + id := ChannelBeadID(name) + _, err := b.run("delete", id, "--hard", "--force") + return err +} + +// ListChannelBeads returns all channel beads. +func (b *Beads) ListChannelBeads() (map[string]*ChannelFields, error) { + out, err := b.run("list", "--label=gt:channel", "--json") + if err != nil { + return nil, err + } + + var issues []*Issue + if err := json.Unmarshal(out, &issues); err != nil { + return nil, fmt.Errorf("parsing bd list output: %w", err) + } + + result := make(map[string]*ChannelFields, len(issues)) + for _, issue := range issues { + fields := ParseChannelFields(issue.Description) + if fields.Name != "" { + result[fields.Name] = fields + } + } + + return result, nil +} + +// LookupChannelByName finds a channel by its name field (not by ID). +// This is used for address resolution where we may not know the full bead ID. +func (b *Beads) LookupChannelByName(name string) (*Issue, *ChannelFields, error) { + // First try direct lookup by standard ID format + issue, fields, err := b.GetChannelBead(name) + if err != nil { + return nil, nil, err + } + if issue != nil { + return issue, fields, nil + } + + // If not found by ID, search all channels by name field + channels, err := b.ListChannelBeads() + if err != nil { + return nil, nil, err + } + + if fields, ok := channels[name]; ok { + // Found by name, now get the full issue + id := ChannelBeadID(name) + issue, err := b.Show(id) + if err != nil { + return nil, nil, err + } + return issue, fields, nil + } + + return nil, nil, nil // Not found +} diff --git a/internal/beads/beads_channel_test.go b/internal/beads/beads_channel_test.go new file mode 100644 index 00000000..18b472bd --- /dev/null +++ b/internal/beads/beads_channel_test.go @@ -0,0 +1,271 @@ +package beads + +import ( + "strings" + "testing" +) + +func TestFormatChannelDescription(t *testing.T) { + tests := []struct { + name string + title string + fields *ChannelFields + want []string // Lines that should be present + }{ + { + name: "basic channel", + title: "Channel: alerts", + fields: &ChannelFields{ + Name: "alerts", + Subscribers: []string{"gastown/crew/max", "gastown/witness"}, + Status: ChannelStatusActive, + CreatedBy: "human", + CreatedAt: "2024-01-15T10:00:00Z", + }, + want: []string{ + "Channel: alerts", + "name: alerts", + "subscribers: gastown/crew/max,gastown/witness", + "status: active", + "created_by: human", + "created_at: 2024-01-15T10:00:00Z", + }, + }, + { + name: "empty subscribers", + title: "Channel: empty", + fields: &ChannelFields{ + Name: "empty", + Subscribers: nil, + Status: ChannelStatusActive, + CreatedBy: "admin", + }, + want: []string{ + "name: empty", + "subscribers: null", + "created_by: admin", + }, + }, + { + name: "with retention", + title: "Channel: builds", + fields: &ChannelFields{ + Name: "builds", + Subscribers: []string{"*/witness"}, + RetentionCount: 100, + RetentionHours: 24, + }, + want: []string{ + "name: builds", + "retention_count: 100", + "retention_hours: 24", + }, + }, + { + name: "closed channel", + title: "Channel: old", + fields: &ChannelFields{ + Name: "old", + Status: ChannelStatusClosed, + }, + want: []string{ + "status: closed", + }, + }, + { + name: "nil fields", + title: "Just a title", + fields: nil, + want: []string{"Just a title"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := FormatChannelDescription(tt.title, tt.fields) + for _, line := range tt.want { + if !strings.Contains(got, line) { + t.Errorf("FormatChannelDescription() missing line %q\ngot:\n%s", line, got) + } + } + }) + } +} + +func TestParseChannelFields(t *testing.T) { + tests := []struct { + name string + description string + want *ChannelFields + }{ + { + name: "full channel", + description: `Channel: alerts + +name: alerts +subscribers: gastown/crew/max,gastown/witness,*/refinery +status: active +retention_count: 50 +retention_hours: 48 +created_by: human +created_at: 2024-01-15T10:00:00Z`, + want: &ChannelFields{ + Name: "alerts", + Subscribers: []string{"gastown/crew/max", "gastown/witness", "*/refinery"}, + Status: ChannelStatusActive, + RetentionCount: 50, + RetentionHours: 48, + CreatedBy: "human", + CreatedAt: "2024-01-15T10:00:00Z", + }, + }, + { + name: "null subscribers", + description: `Channel: empty + +name: empty +subscribers: null +status: active +created_by: admin`, + want: &ChannelFields{ + Name: "empty", + Subscribers: nil, + Status: ChannelStatusActive, + CreatedBy: "admin", + }, + }, + { + name: "single subscriber", + description: `name: solo +subscribers: gastown/crew/max +status: active`, + want: &ChannelFields{ + Name: "solo", + Subscribers: []string{"gastown/crew/max"}, + Status: ChannelStatusActive, + }, + }, + { + name: "empty description", + description: "", + want: &ChannelFields{ + Status: ChannelStatusActive, // Default + }, + }, + { + name: "subscribers with spaces", + description: `name: spaced +subscribers: a, b , c +status: active`, + want: &ChannelFields{ + Name: "spaced", + Subscribers: []string{"a", "b", "c"}, + Status: ChannelStatusActive, + }, + }, + { + name: "closed status", + description: `name: archived +status: closed`, + want: &ChannelFields{ + Name: "archived", + Status: ChannelStatusClosed, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := ParseChannelFields(tt.description) + if got.Name != tt.want.Name { + t.Errorf("Name = %q, want %q", got.Name, tt.want.Name) + } + if got.Status != tt.want.Status { + t.Errorf("Status = %q, want %q", got.Status, tt.want.Status) + } + if got.RetentionCount != tt.want.RetentionCount { + t.Errorf("RetentionCount = %d, want %d", got.RetentionCount, tt.want.RetentionCount) + } + if got.RetentionHours != tt.want.RetentionHours { + t.Errorf("RetentionHours = %d, want %d", got.RetentionHours, tt.want.RetentionHours) + } + if got.CreatedBy != tt.want.CreatedBy { + t.Errorf("CreatedBy = %q, want %q", got.CreatedBy, tt.want.CreatedBy) + } + if got.CreatedAt != tt.want.CreatedAt { + t.Errorf("CreatedAt = %q, want %q", got.CreatedAt, tt.want.CreatedAt) + } + if len(got.Subscribers) != len(tt.want.Subscribers) { + t.Errorf("Subscribers count = %d, want %d", len(got.Subscribers), len(tt.want.Subscribers)) + } else { + for i, s := range got.Subscribers { + if s != tt.want.Subscribers[i] { + t.Errorf("Subscribers[%d] = %q, want %q", i, s, tt.want.Subscribers[i]) + } + } + } + }) + } +} + +func TestChannelBeadID(t *testing.T) { + tests := []struct { + name string + want string + }{ + {"alerts", "gt-channel-alerts"}, + {"builds", "gt-channel-builds"}, + {"team-updates", "gt-channel-team-updates"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ChannelBeadID(tt.name); got != tt.want { + t.Errorf("ChannelBeadID(%q) = %q, want %q", tt.name, got, tt.want) + } + }) + } +} + +func TestChannelRoundTrip(t *testing.T) { + // Test that Format -> Parse preserves data + original := &ChannelFields{ + Name: "test-channel", + Subscribers: []string{"gastown/crew/max", "*/witness", "@town"}, + Status: ChannelStatusActive, + RetentionCount: 100, + RetentionHours: 72, + CreatedBy: "tester", + CreatedAt: "2024-01-15T12:00:00Z", + } + + description := FormatChannelDescription("Channel: test-channel", original) + parsed := ParseChannelFields(description) + + if parsed.Name != original.Name { + t.Errorf("Name: got %q, want %q", parsed.Name, original.Name) + } + if parsed.Status != original.Status { + t.Errorf("Status: got %q, want %q", parsed.Status, original.Status) + } + if parsed.RetentionCount != original.RetentionCount { + t.Errorf("RetentionCount: got %d, want %d", parsed.RetentionCount, original.RetentionCount) + } + if parsed.RetentionHours != original.RetentionHours { + t.Errorf("RetentionHours: got %d, want %d", parsed.RetentionHours, original.RetentionHours) + } + if parsed.CreatedBy != original.CreatedBy { + t.Errorf("CreatedBy: got %q, want %q", parsed.CreatedBy, original.CreatedBy) + } + if parsed.CreatedAt != original.CreatedAt { + t.Errorf("CreatedAt: got %q, want %q", parsed.CreatedAt, original.CreatedAt) + } + if len(parsed.Subscribers) != len(original.Subscribers) { + t.Fatalf("Subscribers count: got %d, want %d", len(parsed.Subscribers), len(original.Subscribers)) + } + for i, s := range original.Subscribers { + if parsed.Subscribers[i] != s { + t.Errorf("Subscribers[%d]: got %q, want %q", i, parsed.Subscribers[i], s) + } + } +}