diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 46038def..72ee3884 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -15,6 +15,7 @@ import ( "github.com/steveyegge/gastown/internal/beads" "github.com/steveyegge/gastown/internal/constants" + "github.com/steveyegge/gastown/internal/feed" "github.com/steveyegge/gastown/internal/keepalive" "github.com/steveyegge/gastown/internal/polecat" "github.com/steveyegge/gastown/internal/tmux" @@ -25,11 +26,12 @@ import ( // This is recovery-focused: normal wake is handled by feed subscription (bd activity --follow). // The daemon is the safety net for dead sessions, GUPP violations, and orphaned work. type Daemon struct { - config *Config - tmux *tmux.Tmux - logger *log.Logger - ctx context.Context - cancel context.CancelFunc + config *Config + tmux *tmux.Tmux + logger *log.Logger + ctx context.Context + cancel context.CancelFunc + curator *feed.Curator } // New creates a new daemon instance. @@ -90,6 +92,14 @@ func (d *Daemon) Run() error { d.logger.Printf("Daemon running, initial heartbeat interval %v", nextInterval) + // Start feed curator goroutine + d.curator = feed.NewCurator(d.config.TownRoot) + if err := d.curator.Start(); err != nil { + d.logger.Printf("Warning: failed to start feed curator: %v", err) + } else { + d.logger.Println("Feed curator started") + } + // Initial heartbeat d.heartbeat(state) @@ -392,6 +402,12 @@ func (d *Daemon) processLifecycleRequests() { func (d *Daemon) shutdown(state *State) error { d.logger.Println("Daemon shutting down") + // Stop feed curator + if d.curator != nil { + d.curator.Stop() + d.logger.Println("Feed curator stopped") + } + state.Running = false if err := SaveState(d.config.TownRoot, state); err != nil { d.logger.Printf("Warning: failed to save final state: %v", err) diff --git a/internal/feed/curator.go b/internal/feed/curator.go new file mode 100644 index 00000000..b060567d --- /dev/null +++ b/internal/feed/curator.go @@ -0,0 +1,352 @@ +// Package feed provides the feed daemon that curates raw events into a user-facing feed. +// +// The curator: +// 1. Tails ~/gt/.events.jsonl (raw events) +// 2. Filters by visibility tag (drops audit-only events) +// 3. Deduplicates repeated updates (5 molecule updates → "agent active") +// 4. Aggregates related events (3 issues closed → "batch complete") +// 5. Writes curated events to ~/gt/.feed.jsonl +package feed + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" + + "github.com/steveyegge/gastown/internal/events" +) + +// FeedFile is the name of the curated feed file. +const FeedFile = ".feed.jsonl" + +// FeedEvent is the structure of events written to the feed. +type FeedEvent struct { + Timestamp string `json:"ts"` + Source string `json:"source"` + Type string `json:"type"` + Actor string `json:"actor"` + Summary string `json:"summary"` + Payload map[string]interface{} `json:"payload,omitempty"` + Count int `json:"count,omitempty"` // For aggregated events +} + +// Curator manages the feed curation process. +type Curator struct { + townRoot string + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + + // Deduplication state + mu sync.Mutex + recentDone map[string]time.Time // actor → last done time (dedupe repeated done events) + recentSling map[string][]slingRecord // actor → recent slings (aggregate) + recentMail map[string]int // actor → mail count in window (aggregate) +} + +type slingRecord struct { + target string + ts time.Time +} + +// Deduplication/aggregation settings +const ( + // Dedupe window for repeated done events from same actor + doneDedupeWindow = 10 * time.Second + + // Aggregation window for sling events + slingAggregateWindow = 30 * time.Second + + // Mail aggregation window + mailAggregateWindow = 30 * time.Second + + // Minimum events to trigger aggregation + minAggregateCount = 3 +) + +// NewCurator creates a new feed curator. +func NewCurator(townRoot string) *Curator { + ctx, cancel := context.WithCancel(context.Background()) + return &Curator{ + townRoot: townRoot, + ctx: ctx, + cancel: cancel, + recentDone: make(map[string]time.Time), + recentSling: make(map[string][]slingRecord), + recentMail: make(map[string]int), + } +} + +// Start begins the curator goroutine. +func (c *Curator) Start() error { + eventsPath := filepath.Join(c.townRoot, events.EventsFile) + + // Open events file, creating if needed + file, err := os.OpenFile(eventsPath, os.O_RDONLY|os.O_CREATE, 0644) + if err != nil { + return fmt.Errorf("opening events file: %w", err) + } + + // Seek to end to only process new events + if _, err := file.Seek(0, io.SeekEnd); err != nil { + file.Close() + return fmt.Errorf("seeking to end: %w", err) + } + + c.wg.Add(1) + go c.run(file) + + return nil +} + +// Stop gracefully stops the curator. +func (c *Curator) Stop() { + c.cancel() + c.wg.Wait() +} + +// run is the main curator loop. +func (c *Curator) run(file *os.File) { + defer c.wg.Done() + defer file.Close() + + reader := bufio.NewReader(file) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + // Cleanup ticker for stale aggregation state + cleanupTicker := time.NewTicker(time.Minute) + defer cleanupTicker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + + case <-cleanupTicker.C: + c.cleanupStaleState() + + case <-ticker.C: + // Read available lines + for { + line, err := reader.ReadString('\n') + if err != nil { + break // No more data available + } + c.processLine(line) + } + } + } +} + +// processLine processes a single line from the events file. +func (c *Curator) processLine(line string) { + if line == "" || line == "\n" { + return + } + + var rawEvent events.Event + if err := json.Unmarshal([]byte(line), &rawEvent); err != nil { + return // Skip malformed lines + } + + // Filter by visibility - only process feed-visible events + if rawEvent.Visibility != events.VisibilityFeed && rawEvent.Visibility != events.VisibilityBoth { + return + } + + // Apply deduplication and aggregation + if c.shouldDedupe(&rawEvent) { + return + } + + // Write to feed + c.writeFeedEvent(&rawEvent) +} + +// shouldDedupe checks if an event should be deduplicated. +// Returns true if the event should be dropped. +func (c *Curator) shouldDedupe(event *events.Event) bool { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + + switch event.Type { + case events.TypeDone: + // Dedupe repeated done events from same actor within window + if lastDone, ok := c.recentDone[event.Actor]; ok { + if now.Sub(lastDone) < doneDedupeWindow { + return true // Skip duplicate + } + } + c.recentDone[event.Actor] = now + return false + + case events.TypeSling: + // Track for potential aggregation (but don't dedupe single slings) + target, _ := event.Payload["target"].(string) + c.recentSling[event.Actor] = append(c.recentSling[event.Actor], slingRecord{ + target: target, + ts: now, + }) + // Prune old records + c.recentSling[event.Actor] = c.pruneRecords(c.recentSling[event.Actor], slingAggregateWindow) + return false + + case events.TypeMail: + // Track mail count for potential aggregation + c.recentMail[event.Actor]++ + // Reset after window (rough approximation) + go func(actor string) { + time.Sleep(mailAggregateWindow) + c.mu.Lock() + defer c.mu.Unlock() + if c.recentMail[actor] > 0 { + c.recentMail[actor]-- + } + }(event.Actor) + return false + } + + return false +} + +// pruneRecords removes records older than the window. +func (c *Curator) pruneRecords(records []slingRecord, window time.Duration) []slingRecord { + now := time.Now() + result := make([]slingRecord, 0, len(records)) + for _, r := range records { + if now.Sub(r.ts) < window { + result = append(result, r) + } + } + return result +} + +// cleanupStaleState removes old entries from tracking maps. +func (c *Curator) cleanupStaleState() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + staleThreshold := 5 * time.Minute + + // Clean stale done records + for actor, ts := range c.recentDone { + if now.Sub(ts) > staleThreshold { + delete(c.recentDone, actor) + } + } + + // Clean stale sling records + for actor, records := range c.recentSling { + c.recentSling[actor] = c.pruneRecords(records, staleThreshold) + if len(c.recentSling[actor]) == 0 { + delete(c.recentSling, actor) + } + } + + // Reset mail counts + c.recentMail = make(map[string]int) +} + +// writeFeedEvent writes a curated event to the feed file. +func (c *Curator) writeFeedEvent(event *events.Event) { + feedEvent := FeedEvent{ + Timestamp: event.Timestamp, + Source: event.Source, + Type: event.Type, + Actor: event.Actor, + Summary: c.generateSummary(event), + Payload: event.Payload, + } + + // Check for aggregation opportunity + c.mu.Lock() + if event.Type == events.TypeSling { + if records := c.recentSling[event.Actor]; len(records) >= minAggregateCount { + feedEvent.Count = len(records) + feedEvent.Summary = fmt.Sprintf("%s dispatching work to %d agents", event.Actor, len(records)) + } + } + c.mu.Unlock() + + data, err := json.Marshal(feedEvent) + if err != nil { + return + } + data = append(data, '\n') + + feedPath := filepath.Join(c.townRoot, FeedFile) + f, err := os.OpenFile(feedPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return + } + defer f.Close() + + f.Write(data) +} + +// generateSummary creates a human-readable summary of an event. +func (c *Curator) generateSummary(event *events.Event) string { + switch event.Type { + case events.TypeSling: + if target, ok := event.Payload["target"].(string); ok { + if bead, ok := event.Payload["bead"].(string); ok { + return fmt.Sprintf("%s assigned %s to %s", event.Actor, bead, target) + } + } + return fmt.Sprintf("%s dispatched work", event.Actor) + + case events.TypeDone: + if bead, ok := event.Payload["bead"].(string); ok { + return fmt.Sprintf("%s completed work on %s", event.Actor, bead) + } + return fmt.Sprintf("%s signaled done", event.Actor) + + case events.TypeHandoff: + return fmt.Sprintf("%s handed off to fresh session", event.Actor) + + case events.TypeMail: + if to, ok := event.Payload["to"].(string); ok { + if subj, ok := event.Payload["subject"].(string); ok { + return fmt.Sprintf("%s → %s: %s", event.Actor, to, subj) + } + } + return fmt.Sprintf("%s sent mail", event.Actor) + + case events.TypePatrolStarted: + if rig, ok := event.Payload["rig"].(string); ok { + return fmt.Sprintf("%s patrol started for %s", event.Actor, rig) + } + return fmt.Sprintf("%s started patrol", event.Actor) + + case events.TypePatrolComplete: + if msg, ok := event.Payload["message"].(string); ok { + return msg + } + return fmt.Sprintf("%s completed patrol", event.Actor) + + case events.TypeMerged: + if worker, ok := event.Payload["worker"].(string); ok { + return fmt.Sprintf("Merged work from %s", worker) + } + return "Work merged" + + case events.TypeMergeFailed: + if reason, ok := event.Payload["reason"].(string); ok { + return fmt.Sprintf("Merge failed: %s", reason) + } + return "Merge failed" + + default: + return fmt.Sprintf("%s: %s", event.Actor, event.Type) + } +} diff --git a/internal/feed/curator_test.go b/internal/feed/curator_test.go new file mode 100644 index 00000000..35141d6f --- /dev/null +++ b/internal/feed/curator_test.go @@ -0,0 +1,196 @@ +package feed + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/steveyegge/gastown/internal/events" +) + +func TestCurator_FiltersByVisibility(t *testing.T) { + // Create temp directory + tmpDir, err := os.MkdirTemp("", "feed-test-*") + if err != nil { + t.Fatalf("creating temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + // Create events file with test events + eventsPath := filepath.Join(tmpDir, events.EventsFile) + feedPath := filepath.Join(tmpDir, FeedFile) + + // Write a feed-visible event + feedEvent := events.Event{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Source: "gt", + Type: events.TypeSling, + Actor: "mayor", + Payload: map[string]interface{}{"bead": "gt-123", "target": "gastown/slit"}, + Visibility: events.VisibilityFeed, + } + feedData, _ := json.Marshal(feedEvent) + + // Write an audit-only event (should be filtered out) + auditEvent := events.Event{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Source: "gt", + Type: "internal_check", + Actor: "daemon", + Visibility: events.VisibilityAudit, + } + auditData, _ := json.Marshal(auditEvent) + + // Create events file + if err := os.WriteFile(eventsPath, []byte{}, 0644); err != nil { + t.Fatalf("creating events file: %v", err) + } + + // Start curator + curator := NewCurator(tmpDir) + if err := curator.Start(); err != nil { + t.Fatalf("starting curator: %v", err) + } + defer curator.Stop() + + // Give curator time to start + time.Sleep(50 * time.Millisecond) + + // Append events + f, err := os.OpenFile(eventsPath, os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + t.Fatalf("opening events file: %v", err) + } + f.Write(append(feedData, '\n')) + f.Write(append(auditData, '\n')) + f.Close() + + // Wait for processing + time.Sleep(300 * time.Millisecond) + + // Check feed file + feedContent, err := os.ReadFile(feedPath) + if err != nil { + t.Fatalf("reading feed file: %v", err) + } + + // Should contain feed event but not audit event + if len(feedContent) == 0 { + t.Error("feed file is empty, expected at least one event") + } + + var writtenEvent FeedEvent + if err := json.Unmarshal(feedContent[:len(feedContent)-1], &writtenEvent); err != nil { + t.Fatalf("parsing feed event: %v", err) + } + + if writtenEvent.Type != events.TypeSling { + t.Errorf("expected type %s, got %s", events.TypeSling, writtenEvent.Type) + } + if writtenEvent.Actor != "mayor" { + t.Errorf("expected actor 'mayor', got %s", writtenEvent.Actor) + } +} + +func TestCurator_DedupesDoneEvents(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "feed-test-*") + if err != nil { + t.Fatalf("creating temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + eventsPath := filepath.Join(tmpDir, events.EventsFile) + feedPath := filepath.Join(tmpDir, FeedFile) + + // Create events file + if err := os.WriteFile(eventsPath, []byte{}, 0644); err != nil { + t.Fatalf("creating events file: %v", err) + } + + // Start curator + curator := NewCurator(tmpDir) + if err := curator.Start(); err != nil { + t.Fatalf("starting curator: %v", err) + } + defer curator.Stop() + + time.Sleep(50 * time.Millisecond) + + // Write 3 identical done events from same actor + f, _ := os.OpenFile(eventsPath, os.O_APPEND|os.O_WRONLY, 0644) + for i := 0; i < 3; i++ { + doneEvent := events.Event{ + Timestamp: time.Now().UTC().Format(time.RFC3339), + Source: "gt", + Type: events.TypeDone, + Actor: "gastown/slit", + Payload: map[string]interface{}{"bead": "slit-12345"}, + Visibility: events.VisibilityFeed, + } + data, _ := json.Marshal(doneEvent) + f.Write(append(data, '\n')) + } + f.Close() + + // Wait for processing + time.Sleep(300 * time.Millisecond) + + // Count feed events + feedContent, _ := os.ReadFile(feedPath) + lines := 0 + for _, b := range feedContent { + if b == '\n' { + lines++ + } + } + + // Should only have 1 event due to deduplication + if lines != 1 { + t.Errorf("expected 1 feed event after deduplication, got %d", lines) + } +} + +func TestCurator_GeneratesSummary(t *testing.T) { + tmpDir, _ := os.MkdirTemp("", "feed-test-*") + defer os.RemoveAll(tmpDir) + + curator := NewCurator(tmpDir) + + tests := []struct { + event *events.Event + expected string + }{ + { + event: &events.Event{ + Type: events.TypeSling, + Actor: "mayor", + Payload: map[string]interface{}{"bead": "gt-123", "target": "gastown/slit"}, + }, + expected: "mayor assigned gt-123 to gastown/slit", + }, + { + event: &events.Event{ + Type: events.TypeDone, + Actor: "gastown/slit", + Payload: map[string]interface{}{"bead": "slit-12345"}, + }, + expected: "gastown/slit completed work on slit-12345", + }, + { + event: &events.Event{ + Type: events.TypeHandoff, + Actor: "gastown/witness", + }, + expected: "gastown/witness handed off to fresh session", + }, + } + + for _, tc := range tests { + summary := curator.generateSummary(tc.event) + if summary != tc.expected { + t.Errorf("generateSummary(%s): expected %q, got %q", tc.event.Type, tc.expected, summary) + } + } +}