Wire witness patrol events to gt feed display
Add support for displaying gt events (from ~/gt/.events.jsonl) in the gt feed TUI, including witness patrol activity: - Add event symbols for patrol events (patrol_started, patrol_complete, polecat_checked, polecat_nudged, escalation_sent), merge events, and general gt events (sling, hook, handoff, mail, spawn, etc.) - Create GtEventsSource that parses .events.jsonl format with proper extraction of rig/role from actor paths and human-readable message generation from event payloads - Create CombinedSource that merges multiple event sources (bd activity and gt events) using fan-in pattern - Update feed command to use combined source for TUI mode - Add appropriate styling for new event types (nudges/escalations in red, patrol complete in green, etc.) Example gt feed output now shows: 09:45 ✓ witness: All polecats healthy 09:44 ⚡ witness: nudged nux (idle 10m) 09:40 🎯 mayor: slung gt-rbncw to furiosa (gt-rbncw) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -44,26 +44,34 @@ func init() {
|
||||
var feedCmd = &cobra.Command{
|
||||
Use: "feed",
|
||||
GroupID: GroupDiag,
|
||||
Short: "Show real-time activity feed from beads",
|
||||
Long: `Display a real-time feed of issue and molecule state changes.
|
||||
Short: "Show real-time activity feed from beads and gt events",
|
||||
Long: `Display a real-time feed of issue changes and agent activity.
|
||||
|
||||
By default, launches an interactive TUI dashboard with:
|
||||
- Agent tree (top): Shows all agents organized by role with latest activity
|
||||
- Event stream (bottom): Chronological feed you can scroll through
|
||||
- Vim-style navigation: j/k to scroll, tab to switch panels, q to quit
|
||||
|
||||
Use --plain for simple text output (wraps bd activity).
|
||||
The feed combines two event sources:
|
||||
- Beads activity: Issue creates, updates, completions (from bd activity)
|
||||
- GT events: Agent activity like patrol, sling, handoff (from .events.jsonl)
|
||||
|
||||
Use --plain for simple text output (wraps bd activity only).
|
||||
|
||||
Tmux Integration:
|
||||
Use --window to open the feed in a dedicated tmux window named 'feed'.
|
||||
This creates a persistent window you can cycle to with C-b n/p.
|
||||
|
||||
Event symbols:
|
||||
+ created/bonded - New issue or molecule created
|
||||
→ in_progress - Work started on an issue
|
||||
✓ completed - Issue closed or step completed
|
||||
✗ failed - Step or issue failed
|
||||
⊘ deleted - Issue removed
|
||||
+ created/bonded - New issue or molecule created
|
||||
→ in_progress - Work started on an issue
|
||||
✓ completed - Issue closed or step completed
|
||||
✗ failed - Step or issue failed
|
||||
⊘ deleted - Issue removed
|
||||
👁 patrol_started - Witness began patrol cycle
|
||||
⚡ polecat_nudged - Worker was nudged
|
||||
🎯 sling - Work was slung to worker
|
||||
🤝 handoff - Session handed off
|
||||
|
||||
MQ (Merge Queue) event symbols:
|
||||
⚙ merge_started - Refinery began processing an MR
|
||||
@@ -187,6 +195,12 @@ func runFeedDirect(workDir string, bdArgs []string) error {
|
||||
|
||||
// runFeedTUI runs the interactive TUI feed.
|
||||
func runFeedTUI(workDir string) error {
|
||||
// Must be in a Gas Town workspace
|
||||
townRoot, err := workspace.FindFromCwdOrError()
|
||||
if err != nil {
|
||||
return fmt.Errorf("not in a Gas Town workspace: %w", err)
|
||||
}
|
||||
|
||||
var sources []feed.EventSource
|
||||
|
||||
// Create event source from bd activity
|
||||
@@ -202,6 +216,12 @@ func runFeedTUI(workDir string) error {
|
||||
sources = append(sources, mqSource)
|
||||
}
|
||||
|
||||
// Create GT events source (optional - don't fail if not available)
|
||||
gtSource, err := feed.NewGtEventsSource(townRoot)
|
||||
if err == nil {
|
||||
sources = append(sources, gtSource)
|
||||
}
|
||||
|
||||
// Combine all sources
|
||||
multiSource := feed.NewMultiSource(sources...)
|
||||
defer multiSource.Close()
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
@@ -211,34 +212,34 @@ func parseBeadContext(beadID string) (actor, rig, role string) {
|
||||
return
|
||||
}
|
||||
|
||||
// JSONLSource reads events from a JSONL file (like .events.jsonl)
|
||||
type JSONLSource struct {
|
||||
file *os.File
|
||||
events chan Event
|
||||
cancel context.CancelFunc
|
||||
// GtEventsSource reads events from ~/gt/.events.jsonl (gt activity log)
|
||||
type GtEventsSource struct {
|
||||
file *os.File
|
||||
events chan Event
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// JSONLEvent is the structure of events in .events.jsonl
|
||||
type JSONLEvent struct {
|
||||
Timestamp string `json:"timestamp"`
|
||||
Type string `json:"type"`
|
||||
Actor string `json:"actor"`
|
||||
Target string `json:"target"`
|
||||
Message string `json:"message"`
|
||||
Rig string `json:"rig"`
|
||||
Role string `json:"role"`
|
||||
// GtEvent is the structure of events in .events.jsonl
|
||||
type GtEvent struct {
|
||||
Timestamp string `json:"ts"`
|
||||
Source string `json:"source"`
|
||||
Type string `json:"type"`
|
||||
Actor string `json:"actor"`
|
||||
Payload map[string]interface{} `json:"payload"`
|
||||
Visibility string `json:"visibility"`
|
||||
}
|
||||
|
||||
// NewJSONLSource creates a source that tails a JSONL file
|
||||
func NewJSONLSource(filePath string) (*JSONLSource, error) {
|
||||
file, err := os.Open(filePath)
|
||||
// NewGtEventsSource creates a source that tails ~/gt/.events.jsonl
|
||||
func NewGtEventsSource(townRoot string) (*GtEventsSource, error) {
|
||||
eventsPath := filepath.Join(townRoot, ".events.jsonl")
|
||||
file, err := os.Open(eventsPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
source := &JSONLSource{
|
||||
source := &GtEventsSource{
|
||||
file: file,
|
||||
events: make(chan Event, 100),
|
||||
cancel: cancel,
|
||||
@@ -250,7 +251,7 @@ func NewJSONLSource(filePath string) (*JSONLSource, error) {
|
||||
}
|
||||
|
||||
// tail follows the file and sends events
|
||||
func (s *JSONLSource) tail(ctx context.Context) {
|
||||
func (s *GtEventsSource) tail(ctx context.Context) {
|
||||
defer close(s.events)
|
||||
|
||||
// Seek to end for live tailing
|
||||
@@ -267,7 +268,7 @@ func (s *JSONLSource) tail(ctx context.Context) {
|
||||
case <-ticker.C:
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if event := parseJSONLLine(line); event != nil {
|
||||
if event := parseGtEventLine(line); event != nil {
|
||||
select {
|
||||
case s.events <- *event:
|
||||
default:
|
||||
@@ -279,44 +280,292 @@ func (s *JSONLSource) tail(ctx context.Context) {
|
||||
}
|
||||
|
||||
// Events returns the event channel
|
||||
func (s *JSONLSource) Events() <-chan Event {
|
||||
func (s *GtEventsSource) Events() <-chan Event {
|
||||
return s.events
|
||||
}
|
||||
|
||||
// Close stops the source
|
||||
func (s *JSONLSource) Close() error {
|
||||
func (s *GtEventsSource) Close() error {
|
||||
s.cancel()
|
||||
return s.file.Close()
|
||||
}
|
||||
|
||||
// parseJSONLLine parses a JSONL event line
|
||||
func parseJSONLLine(line string) *Event {
|
||||
// parseGtEventLine parses a line from .events.jsonl
|
||||
func parseGtEventLine(line string) *Event {
|
||||
if strings.TrimSpace(line) == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var je JSONLEvent
|
||||
if err := json.Unmarshal([]byte(line), &je); err != nil {
|
||||
var ge GtEvent
|
||||
if err := json.Unmarshal([]byte(line), &ge); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
t, err := time.Parse(time.RFC3339, je.Timestamp)
|
||||
// Only show feed-visible events
|
||||
if ge.Visibility != "feed" && ge.Visibility != "both" {
|
||||
return nil
|
||||
}
|
||||
|
||||
t, err := time.Parse(time.RFC3339, ge.Timestamp)
|
||||
if err != nil {
|
||||
t = time.Now()
|
||||
}
|
||||
|
||||
// Extract rig from payload or actor
|
||||
rig := ""
|
||||
if ge.Payload != nil {
|
||||
if r, ok := ge.Payload["rig"].(string); ok {
|
||||
rig = r
|
||||
}
|
||||
}
|
||||
if rig == "" && ge.Actor != "" {
|
||||
// Extract rig from actor like "gastown/witness"
|
||||
parts := strings.Split(ge.Actor, "/")
|
||||
if len(parts) > 0 && parts[0] != "mayor" && parts[0] != "deacon" {
|
||||
rig = parts[0]
|
||||
}
|
||||
}
|
||||
|
||||
// Extract role from actor
|
||||
role := ""
|
||||
if ge.Actor != "" {
|
||||
parts := strings.Split(ge.Actor, "/")
|
||||
if len(parts) >= 2 {
|
||||
role = parts[len(parts)-1]
|
||||
// Check for known roles
|
||||
switch parts[len(parts)-1] {
|
||||
case "witness", "refinery":
|
||||
role = parts[len(parts)-1]
|
||||
default:
|
||||
// Could be polecat name - check second-to-last part
|
||||
if len(parts) >= 2 {
|
||||
switch parts[len(parts)-2] {
|
||||
case "polecats":
|
||||
role = "polecat"
|
||||
case "crew":
|
||||
role = "crew"
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if len(parts) == 1 {
|
||||
role = parts[0]
|
||||
}
|
||||
}
|
||||
|
||||
// Build message from event type and payload
|
||||
message := buildEventMessage(ge.Type, ge.Payload)
|
||||
|
||||
return &Event{
|
||||
Time: t,
|
||||
Type: je.Type,
|
||||
Actor: je.Actor,
|
||||
Target: je.Target,
|
||||
Message: je.Message,
|
||||
Rig: je.Rig,
|
||||
Role: je.Role,
|
||||
Type: ge.Type,
|
||||
Actor: ge.Actor,
|
||||
Target: getPayloadString(ge.Payload, "bead"),
|
||||
Message: message,
|
||||
Rig: rig,
|
||||
Role: role,
|
||||
Raw: line,
|
||||
}
|
||||
}
|
||||
|
||||
// buildEventMessage creates a human-readable message from event type and payload
|
||||
func buildEventMessage(eventType string, payload map[string]interface{}) string {
|
||||
switch eventType {
|
||||
case "patrol_started":
|
||||
count := getPayloadInt(payload, "polecat_count")
|
||||
if msg := getPayloadString(payload, "message"); msg != "" {
|
||||
return msg
|
||||
}
|
||||
if count > 0 {
|
||||
return fmt.Sprintf("patrol started (%d polecats)", count)
|
||||
}
|
||||
return "patrol started"
|
||||
|
||||
case "patrol_complete":
|
||||
count := getPayloadInt(payload, "polecat_count")
|
||||
if msg := getPayloadString(payload, "message"); msg != "" {
|
||||
return msg
|
||||
}
|
||||
if count > 0 {
|
||||
return fmt.Sprintf("patrol complete (%d polecats)", count)
|
||||
}
|
||||
return "patrol complete"
|
||||
|
||||
case "polecat_checked":
|
||||
polecat := getPayloadString(payload, "polecat")
|
||||
status := getPayloadString(payload, "status")
|
||||
if polecat != "" {
|
||||
if status != "" {
|
||||
return fmt.Sprintf("checked %s (%s)", polecat, status)
|
||||
}
|
||||
return fmt.Sprintf("checked %s", polecat)
|
||||
}
|
||||
return "polecat checked"
|
||||
|
||||
case "polecat_nudged":
|
||||
polecat := getPayloadString(payload, "polecat")
|
||||
reason := getPayloadString(payload, "reason")
|
||||
if polecat != "" {
|
||||
if reason != "" {
|
||||
return fmt.Sprintf("nudged %s: %s", polecat, reason)
|
||||
}
|
||||
return fmt.Sprintf("nudged %s", polecat)
|
||||
}
|
||||
return "polecat nudged"
|
||||
|
||||
case "escalation_sent":
|
||||
target := getPayloadString(payload, "target")
|
||||
to := getPayloadString(payload, "to")
|
||||
reason := getPayloadString(payload, "reason")
|
||||
if target != "" && to != "" {
|
||||
if reason != "" {
|
||||
return fmt.Sprintf("escalated %s to %s: %s", target, to, reason)
|
||||
}
|
||||
return fmt.Sprintf("escalated %s to %s", target, to)
|
||||
}
|
||||
return "escalation sent"
|
||||
|
||||
case "sling":
|
||||
bead := getPayloadString(payload, "bead")
|
||||
target := getPayloadString(payload, "target")
|
||||
if bead != "" && target != "" {
|
||||
return fmt.Sprintf("slung %s to %s", bead, target)
|
||||
}
|
||||
return "work slung"
|
||||
|
||||
case "hook":
|
||||
bead := getPayloadString(payload, "bead")
|
||||
if bead != "" {
|
||||
return fmt.Sprintf("hooked %s", bead)
|
||||
}
|
||||
return "bead hooked"
|
||||
|
||||
case "handoff":
|
||||
subject := getPayloadString(payload, "subject")
|
||||
if subject != "" {
|
||||
return fmt.Sprintf("handoff: %s", subject)
|
||||
}
|
||||
return "session handoff"
|
||||
|
||||
case "done":
|
||||
bead := getPayloadString(payload, "bead")
|
||||
if bead != "" {
|
||||
return fmt.Sprintf("done: %s", bead)
|
||||
}
|
||||
return "work done"
|
||||
|
||||
case "mail":
|
||||
subject := getPayloadString(payload, "subject")
|
||||
to := getPayloadString(payload, "to")
|
||||
if subject != "" {
|
||||
if to != "" {
|
||||
return fmt.Sprintf("→ %s: %s", to, subject)
|
||||
}
|
||||
return subject
|
||||
}
|
||||
return "mail sent"
|
||||
|
||||
case "merged":
|
||||
worker := getPayloadString(payload, "worker")
|
||||
if worker != "" {
|
||||
return fmt.Sprintf("merged work from %s", worker)
|
||||
}
|
||||
return "merged"
|
||||
|
||||
case "merge_failed":
|
||||
reason := getPayloadString(payload, "reason")
|
||||
if reason != "" {
|
||||
return fmt.Sprintf("merge failed: %s", reason)
|
||||
}
|
||||
return "merge failed"
|
||||
|
||||
default:
|
||||
if msg := getPayloadString(payload, "message"); msg != "" {
|
||||
return msg
|
||||
}
|
||||
return eventType
|
||||
}
|
||||
}
|
||||
|
||||
// getPayloadString extracts a string from payload
|
||||
func getPayloadString(payload map[string]interface{}, key string) string {
|
||||
if payload == nil {
|
||||
return ""
|
||||
}
|
||||
if v, ok := payload[key].(string); ok {
|
||||
return v
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// getPayloadInt extracts an int from payload
|
||||
func getPayloadInt(payload map[string]interface{}, key string) int {
|
||||
if payload == nil {
|
||||
return 0
|
||||
}
|
||||
if v, ok := payload[key].(float64); ok {
|
||||
return int(v)
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// CombinedSource merges events from multiple sources
|
||||
type CombinedSource struct {
|
||||
sources []EventSource
|
||||
events chan Event
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewCombinedSource creates a source that merges multiple event sources
|
||||
func NewCombinedSource(sources ...EventSource) *CombinedSource {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
combined := &CombinedSource{
|
||||
sources: sources,
|
||||
events: make(chan Event, 100),
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
// Fan-in from all sources
|
||||
for _, src := range sources {
|
||||
go func(s EventSource) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case event, ok := <-s.Events():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case combined.events <- event:
|
||||
default:
|
||||
// Drop if full
|
||||
}
|
||||
}
|
||||
}
|
||||
}(src)
|
||||
}
|
||||
|
||||
return combined
|
||||
}
|
||||
|
||||
// Events returns the combined event channel
|
||||
func (c *CombinedSource) Events() <-chan Event {
|
||||
return c.events
|
||||
}
|
||||
|
||||
// Close stops all sources
|
||||
func (c *CombinedSource) Close() error {
|
||||
c.cancel()
|
||||
var lastErr error
|
||||
for _, src := range c.sources {
|
||||
if err := src.Close(); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// FindBeadsDir finds the beads directory for the given working directory
|
||||
func FindBeadsDir(workDir string) (string, error) {
|
||||
// Walk up looking for .beads
|
||||
|
||||
@@ -123,15 +123,34 @@ var (
|
||||
|
||||
// Event symbols
|
||||
EventSymbols = map[string]string{
|
||||
"create": "+",
|
||||
"update": "→",
|
||||
"complete": "✓",
|
||||
"fail": "✗",
|
||||
"delete": "⊘",
|
||||
"pin": "📌",
|
||||
"create": "+",
|
||||
"update": "→",
|
||||
"complete": "✓",
|
||||
"fail": "✗",
|
||||
"delete": "⊘",
|
||||
"pin": "📌",
|
||||
// Witness patrol events
|
||||
"patrol_started": "👁",
|
||||
"patrol_complete": "✓",
|
||||
"polecat_checked": "·",
|
||||
"polecat_nudged": "⚡",
|
||||
"escalation_sent": "⬆",
|
||||
// Merge events
|
||||
"merge_started": "⚙",
|
||||
"merged": "✓",
|
||||
"merge_failed": "✗",
|
||||
"merge_skipped": "⊘",
|
||||
// General gt events
|
||||
"sling": "🎯",
|
||||
"hook": "🪝",
|
||||
"unhook": "↩",
|
||||
"handoff": "🤝",
|
||||
"done": "✓",
|
||||
"mail": "✉",
|
||||
"spawn": "🚀",
|
||||
"kill": "💀",
|
||||
"nudge": "⚡",
|
||||
"boot": "🔌",
|
||||
"halt": "⏹",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -249,20 +249,24 @@ func (m *Model) renderEvent(e Event) string {
|
||||
symbolStyle = EventCreateStyle
|
||||
case "update":
|
||||
symbolStyle = EventUpdateStyle
|
||||
case "complete":
|
||||
case "complete", "patrol_complete", "merged", "done":
|
||||
symbolStyle = EventCompleteStyle
|
||||
case "fail":
|
||||
case "fail", "merge_failed":
|
||||
symbolStyle = EventFailStyle
|
||||
case "delete":
|
||||
symbolStyle = EventDeleteStyle
|
||||
case "merge_started":
|
||||
symbolStyle = EventMergeStartedStyle
|
||||
case "merged":
|
||||
symbolStyle = EventMergedStyle
|
||||
case "merge_failed":
|
||||
symbolStyle = EventMergeFailedStyle
|
||||
case "merge_skipped":
|
||||
symbolStyle = EventMergeSkippedStyle
|
||||
case "patrol_started", "polecat_checked":
|
||||
symbolStyle = EventUpdateStyle
|
||||
case "polecat_nudged", "escalation_sent", "nudge":
|
||||
symbolStyle = EventFailStyle // Use red/warning style for nudges and escalations
|
||||
case "sling", "hook", "spawn", "boot":
|
||||
symbolStyle = EventCreateStyle
|
||||
case "handoff", "mail":
|
||||
symbolStyle = EventUpdateStyle
|
||||
default:
|
||||
symbolStyle = EventUpdateStyle
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user