feat: add --town flag to bd activity for aggregated cross-rig feed (bd-dx6e)

Adds --town flag that:
- Discovers all rigs via routes.jsonl
- Connects to each rig daemon
- Aggregates mutations from all daemons
- Sorts by timestamp for unified feed
- Works with --follow for real-time streaming

Usage:
  bd activity --town              # Aggregated feed from all rigs
  bd activity --follow --town     # Stream all rig activity

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-29 21:09:39 -08:00
parent 35ffeea804
commit 26e491d7d4

View File

@@ -4,12 +4,15 @@ import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/spf13/cobra"
"github.com/steveyegge/beads/internal/routing"
"github.com/steveyegge/beads/internal/rpc"
"github.com/steveyegge/beads/internal/ui"
)
@@ -21,6 +24,7 @@ var (
activityType string
activityLimit int
activityInterval time.Duration
activityTown bool
)
// ActivityEvent represents a formatted activity event for output
@@ -60,7 +64,9 @@ Examples:
bd activity --since 5m # Events from last 5 minutes
bd activity --since 1h # Events from last hour
bd activity --type update # Only show updates
bd activity --limit 50 # Show last 50 events`,
bd activity --limit 50 # Show last 50 events
bd activity --town # Aggregated feed from all rigs
bd activity --follow --town # Stream all rig activity`,
Run: runActivity,
}
@@ -71,18 +77,12 @@ func init() {
activityCmd.Flags().StringVar(&activityType, "type", "", "Filter by event type (create, update, delete, comment)")
activityCmd.Flags().IntVar(&activityLimit, "limit", 100, "Maximum number of events to show")
activityCmd.Flags().DurationVar(&activityInterval, "interval", 500*time.Millisecond, "Polling interval for --follow mode")
activityCmd.Flags().BoolVar(&activityTown, "town", false, "Aggregated feed from all rigs (uses routes.jsonl)")
rootCmd.AddCommand(activityCmd)
}
func runActivity(cmd *cobra.Command, args []string) {
// Activity requires daemon for mutation events
if daemonClient == nil {
fmt.Fprintln(os.Stderr, "Error: activity command requires daemon (mutations not available in direct mode)")
fmt.Fprintln(os.Stderr, "Hint: Start daemon with 'bd daemons start .' or remove --no-daemon flag")
os.Exit(1)
}
// Parse --since duration
var sinceTime time.Time
if activitySince != "" {
@@ -94,6 +94,23 @@ func runActivity(cmd *cobra.Command, args []string) {
sinceTime = time.Now().Add(-duration)
}
// Town-wide aggregated feed
if activityTown {
if activityFollow {
runTownActivityFollow(sinceTime)
} else {
runTownActivityOnce(sinceTime)
}
return
}
// Single-rig activity requires daemon
if daemonClient == nil {
fmt.Fprintln(os.Stderr, "Error: activity command requires daemon (mutations not available in direct mode)")
fmt.Fprintln(os.Stderr, "Hint: Start daemon with 'bd daemons start .' or remove --no-daemon flag")
os.Exit(1)
}
if activityFollow {
runActivityFollow(sinceTime)
} else {
@@ -419,3 +436,247 @@ func parseDurationString(s string) (time.Duration, error) {
return 0, fmt.Errorf("unknown duration unit: %s", unit)
}
}
// rigDaemon holds a connection to a rig's daemon
type rigDaemon struct {
prefix string // e.g., "bd-"
rig string // e.g., "beads"
client *rpc.Client // nil if daemon not running
}
// discoverRigDaemons finds all rigs via routes.jsonl and connects to their daemons
func discoverRigDaemons() []rigDaemon {
var daemons []rigDaemon
// Find town beads directory (uses findTownBeadsDir from create.go)
townBeadsDir, err := findTownBeadsDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: not in a Gas Town (%v)\n", err)
os.Exit(1)
}
// Load routes
routes, err := routing.LoadRoutes(townBeadsDir)
if err != nil || len(routes) == 0 {
fmt.Fprintln(os.Stderr, "Error: no routes found in routes.jsonl")
os.Exit(1)
}
townRoot := filepath.Dir(townBeadsDir)
for _, route := range routes {
// Resolve beads directory for this route
var beadsDir string
if route.Path == "." {
beadsDir = townBeadsDir
} else {
beadsDir = filepath.Join(townRoot, route.Path, ".beads")
}
// Follow redirect if present
beadsDir = resolveBeadsRedirect(beadsDir)
// Check if daemon is running
socketPath := filepath.Join(beadsDir, "bd.sock")
client, _ := rpc.TryConnect(socketPath)
rigName := routing.ExtractProjectFromPath(route.Path)
if rigName == "" {
rigName = "town" // For path="."
}
daemons = append(daemons, rigDaemon{
prefix: route.Prefix,
rig: rigName,
client: client,
})
}
return daemons
}
// resolveBeadsRedirect follows a redirect file if present
func resolveBeadsRedirect(beadsDir string) string {
redirectFile := filepath.Join(beadsDir, "redirect")
data, err := os.ReadFile(redirectFile)
if err != nil {
return beadsDir
}
redirectPath := strings.TrimSpace(string(data))
if redirectPath == "" {
return beadsDir
}
// Handle relative paths
if !filepath.IsAbs(redirectPath) {
redirectPath = filepath.Join(beadsDir, redirectPath)
}
return filepath.Clean(redirectPath)
}
// fetchTownMutations retrieves mutations from all rig daemons
func fetchTownMutations(daemons []rigDaemon, since time.Time) []rpc.MutationEvent {
var allEvents []rpc.MutationEvent
var sinceMillis int64
if !since.IsZero() {
sinceMillis = since.UnixMilli()
}
for _, d := range daemons {
if d.client == nil {
continue
}
resp, err := d.client.GetMutations(&rpc.GetMutationsArgs{Since: sinceMillis})
if err != nil {
continue
}
var mutations []rpc.MutationEvent
if err := json.Unmarshal(resp.Data, &mutations); err != nil {
continue
}
allEvents = append(allEvents, mutations...)
}
// Sort by timestamp
sort.Slice(allEvents, func(i, j int) bool {
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
})
return allEvents
}
// runTownActivityOnce fetches and displays events from all rigs once
func runTownActivityOnce(sinceTime time.Time) {
daemons := discoverRigDaemons()
defer closeDaemons(daemons)
// Count active daemons
activeCount := 0
for _, d := range daemons {
if d.client != nil {
activeCount++
}
}
if activeCount == 0 {
fmt.Fprintln(os.Stderr, "Error: no rig daemons running")
fmt.Fprintln(os.Stderr, "Hint: Start daemons with 'bd daemons start' in each rig")
os.Exit(1)
}
events := fetchTownMutations(daemons, sinceTime)
// Apply filters and limit
events = filterEvents(events)
if len(events) > activityLimit {
events = events[len(events)-activityLimit:]
}
if jsonOutput {
formatted := make([]ActivityEvent, 0, len(events))
for _, e := range events {
formatted = append(formatted, formatEvent(e))
}
outputJSON(formatted)
return
}
if len(events) == 0 {
fmt.Printf("No recent activity across %d rigs\n", activeCount)
return
}
for _, e := range events {
printEvent(e)
}
}
// runTownActivityFollow streams events from all rigs in real-time
func runTownActivityFollow(sinceTime time.Time) {
daemons := discoverRigDaemons()
defer closeDaemons(daemons)
// Count active daemons
activeCount := 0
var activeRigs []string
for _, d := range daemons {
if d.client != nil {
activeCount++
activeRigs = append(activeRigs, d.rig)
}
}
if activeCount == 0 {
fmt.Fprintln(os.Stderr, "Error: no rig daemons running")
fmt.Fprintln(os.Stderr, "Hint: Start daemons with 'bd daemons start' in each rig")
os.Exit(1)
}
// Show which rigs we're monitoring
if !jsonOutput {
fmt.Printf("Streaming activity from %d rigs: %s\n", activeCount, strings.Join(activeRigs, ", "))
}
// Start from now if no --since specified
lastPoll := time.Now().Add(-1 * time.Second)
if !sinceTime.IsZero() {
lastPoll = sinceTime
}
// First fetch any events since the start time
events := fetchTownMutations(daemons, sinceTime)
events = filterEvents(events)
for _, e := range events {
if jsonOutput {
data, _ := json.Marshal(formatEvent(e))
fmt.Println(string(data))
} else {
printEvent(e)
}
if e.Timestamp.After(lastPoll) {
lastPoll = e.Timestamp
}
}
// Poll for new events
ticker := time.NewTicker(activityInterval)
defer ticker.Stop()
for {
select {
case <-rootCtx.Done():
return
case <-ticker.C:
newEvents := fetchTownMutations(daemons, lastPoll)
newEvents = filterEvents(newEvents)
for _, e := range newEvents {
if jsonOutput {
data, _ := json.Marshal(formatEvent(e))
fmt.Println(string(data))
} else {
printEvent(e)
}
if e.Timestamp.After(lastPoll) {
lastPoll = e.Timestamp
}
}
}
}
}
// closeDaemons closes all daemon connections
func closeDaemons(daemons []rigDaemon) {
for _, d := range daemons {
if d.client != nil {
d.client.Close()
}
}
}