diff --git a/cmd/bd/activity.go b/cmd/bd/activity.go index 6a05084b..9537b5dd 100644 --- a/cmd/bd/activity.go +++ b/cmd/bd/activity.go @@ -4,12 +4,15 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "regexp" + "sort" "strconv" "strings" "time" "github.com/spf13/cobra" + "github.com/steveyegge/beads/internal/routing" "github.com/steveyegge/beads/internal/rpc" "github.com/steveyegge/beads/internal/ui" ) @@ -21,6 +24,7 @@ var ( activityType string activityLimit int activityInterval time.Duration + activityTown bool ) // ActivityEvent represents a formatted activity event for output @@ -60,7 +64,9 @@ Examples: bd activity --since 5m # Events from last 5 minutes bd activity --since 1h # Events from last hour bd activity --type update # Only show updates - bd activity --limit 50 # Show last 50 events`, + bd activity --limit 50 # Show last 50 events + bd activity --town # Aggregated feed from all rigs + bd activity --follow --town # Stream all rig activity`, Run: runActivity, } @@ -71,18 +77,12 @@ func init() { activityCmd.Flags().StringVar(&activityType, "type", "", "Filter by event type (create, update, delete, comment)") activityCmd.Flags().IntVar(&activityLimit, "limit", 100, "Maximum number of events to show") activityCmd.Flags().DurationVar(&activityInterval, "interval", 500*time.Millisecond, "Polling interval for --follow mode") + activityCmd.Flags().BoolVar(&activityTown, "town", false, "Aggregated feed from all rigs (uses routes.jsonl)") rootCmd.AddCommand(activityCmd) } func runActivity(cmd *cobra.Command, args []string) { - // Activity requires daemon for mutation events - if daemonClient == nil { - fmt.Fprintln(os.Stderr, "Error: activity command requires daemon (mutations not available in direct mode)") - fmt.Fprintln(os.Stderr, "Hint: Start daemon with 'bd daemons start .' or remove --no-daemon flag") - os.Exit(1) - } - // Parse --since duration var sinceTime time.Time if activitySince != "" { @@ -94,6 +94,23 @@ func runActivity(cmd *cobra.Command, args []string) { sinceTime = time.Now().Add(-duration) } + // Town-wide aggregated feed + if activityTown { + if activityFollow { + runTownActivityFollow(sinceTime) + } else { + runTownActivityOnce(sinceTime) + } + return + } + + // Single-rig activity requires daemon + if daemonClient == nil { + fmt.Fprintln(os.Stderr, "Error: activity command requires daemon (mutations not available in direct mode)") + fmt.Fprintln(os.Stderr, "Hint: Start daemon with 'bd daemons start .' or remove --no-daemon flag") + os.Exit(1) + } + if activityFollow { runActivityFollow(sinceTime) } else { @@ -419,3 +436,247 @@ func parseDurationString(s string) (time.Duration, error) { return 0, fmt.Errorf("unknown duration unit: %s", unit) } } + +// rigDaemon holds a connection to a rig's daemon +type rigDaemon struct { + prefix string // e.g., "bd-" + rig string // e.g., "beads" + client *rpc.Client // nil if daemon not running +} + +// discoverRigDaemons finds all rigs via routes.jsonl and connects to their daemons +func discoverRigDaemons() []rigDaemon { + var daemons []rigDaemon + + // Find town beads directory (uses findTownBeadsDir from create.go) + townBeadsDir, err := findTownBeadsDir() + if err != nil { + fmt.Fprintf(os.Stderr, "Error: not in a Gas Town (%v)\n", err) + os.Exit(1) + } + + // Load routes + routes, err := routing.LoadRoutes(townBeadsDir) + if err != nil || len(routes) == 0 { + fmt.Fprintln(os.Stderr, "Error: no routes found in routes.jsonl") + os.Exit(1) + } + + townRoot := filepath.Dir(townBeadsDir) + + for _, route := range routes { + // Resolve beads directory for this route + var beadsDir string + if route.Path == "." { + beadsDir = townBeadsDir + } else { + beadsDir = filepath.Join(townRoot, route.Path, ".beads") + } + + // Follow redirect if present + beadsDir = resolveBeadsRedirect(beadsDir) + + // Check if daemon is running + socketPath := filepath.Join(beadsDir, "bd.sock") + client, _ := rpc.TryConnect(socketPath) + + rigName := routing.ExtractProjectFromPath(route.Path) + if rigName == "" { + rigName = "town" // For path="." + } + + daemons = append(daemons, rigDaemon{ + prefix: route.Prefix, + rig: rigName, + client: client, + }) + } + + return daemons +} + +// resolveBeadsRedirect follows a redirect file if present +func resolveBeadsRedirect(beadsDir string) string { + redirectFile := filepath.Join(beadsDir, "redirect") + data, err := os.ReadFile(redirectFile) + if err != nil { + return beadsDir + } + + redirectPath := strings.TrimSpace(string(data)) + if redirectPath == "" { + return beadsDir + } + + // Handle relative paths + if !filepath.IsAbs(redirectPath) { + redirectPath = filepath.Join(beadsDir, redirectPath) + } + + return filepath.Clean(redirectPath) +} + +// fetchTownMutations retrieves mutations from all rig daemons +func fetchTownMutations(daemons []rigDaemon, since time.Time) []rpc.MutationEvent { + var allEvents []rpc.MutationEvent + + var sinceMillis int64 + if !since.IsZero() { + sinceMillis = since.UnixMilli() + } + + for _, d := range daemons { + if d.client == nil { + continue + } + + resp, err := d.client.GetMutations(&rpc.GetMutationsArgs{Since: sinceMillis}) + if err != nil { + continue + } + + var mutations []rpc.MutationEvent + if err := json.Unmarshal(resp.Data, &mutations); err != nil { + continue + } + + allEvents = append(allEvents, mutations...) + } + + // Sort by timestamp + sort.Slice(allEvents, func(i, j int) bool { + return allEvents[i].Timestamp.Before(allEvents[j].Timestamp) + }) + + return allEvents +} + +// runTownActivityOnce fetches and displays events from all rigs once +func runTownActivityOnce(sinceTime time.Time) { + daemons := discoverRigDaemons() + defer closeDaemons(daemons) + + // Count active daemons + activeCount := 0 + for _, d := range daemons { + if d.client != nil { + activeCount++ + } + } + + if activeCount == 0 { + fmt.Fprintln(os.Stderr, "Error: no rig daemons running") + fmt.Fprintln(os.Stderr, "Hint: Start daemons with 'bd daemons start' in each rig") + os.Exit(1) + } + + events := fetchTownMutations(daemons, sinceTime) + + // Apply filters and limit + events = filterEvents(events) + if len(events) > activityLimit { + events = events[len(events)-activityLimit:] + } + + if jsonOutput { + formatted := make([]ActivityEvent, 0, len(events)) + for _, e := range events { + formatted = append(formatted, formatEvent(e)) + } + outputJSON(formatted) + return + } + + if len(events) == 0 { + fmt.Printf("No recent activity across %d rigs\n", activeCount) + return + } + + for _, e := range events { + printEvent(e) + } +} + +// runTownActivityFollow streams events from all rigs in real-time +func runTownActivityFollow(sinceTime time.Time) { + daemons := discoverRigDaemons() + defer closeDaemons(daemons) + + // Count active daemons + activeCount := 0 + var activeRigs []string + for _, d := range daemons { + if d.client != nil { + activeCount++ + activeRigs = append(activeRigs, d.rig) + } + } + + if activeCount == 0 { + fmt.Fprintln(os.Stderr, "Error: no rig daemons running") + fmt.Fprintln(os.Stderr, "Hint: Start daemons with 'bd daemons start' in each rig") + os.Exit(1) + } + + // Show which rigs we're monitoring + if !jsonOutput { + fmt.Printf("Streaming activity from %d rigs: %s\n", activeCount, strings.Join(activeRigs, ", ")) + } + + // Start from now if no --since specified + lastPoll := time.Now().Add(-1 * time.Second) + if !sinceTime.IsZero() { + lastPoll = sinceTime + } + + // First fetch any events since the start time + events := fetchTownMutations(daemons, sinceTime) + events = filterEvents(events) + + for _, e := range events { + if jsonOutput { + data, _ := json.Marshal(formatEvent(e)) + fmt.Println(string(data)) + } else { + printEvent(e) + } + if e.Timestamp.After(lastPoll) { + lastPoll = e.Timestamp + } + } + + // Poll for new events + ticker := time.NewTicker(activityInterval) + defer ticker.Stop() + + for { + select { + case <-rootCtx.Done(): + return + case <-ticker.C: + newEvents := fetchTownMutations(daemons, lastPoll) + newEvents = filterEvents(newEvents) + + for _, e := range newEvents { + if jsonOutput { + data, _ := json.Marshal(formatEvent(e)) + fmt.Println(string(data)) + } else { + printEvent(e) + } + if e.Timestamp.After(lastPoll) { + lastPoll = e.Timestamp + } + } + } + } +} + +// closeDaemons closes all daemon connections +func closeDaemons(daemons []rigDaemon) { + for _, d := range daemons { + if d.client != nil { + d.client.Close() + } + } +}