From d2fccd580c266ad8e8789a5c87d18d88fda6d0b5 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Sat, 20 Dec 2025 13:19:40 -0800 Subject: [PATCH] feat(daemon): Add slot-based notification deduplication (gt-wpg) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement replaceable notifications to prevent heartbeat stacking when agents are busy. Only the latest notification per slot is delivered. Changes: - Add NotificationManager for tracking pending notifications - Add SendKeysReplace() that clears input line before sending - Integrate slot tracking into daemon heartbeat pokes - Mark notifications consumed when agent shows activity The system tracks pending notifications in state files and skips sending if a notification for the same slot is still pending. When agent activity is detected (keepalive), slots are marked consumed allowing new notifications to be sent. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/daemon/daemon.go | 81 ++++++++---- internal/daemon/notification.go | 212 ++++++++++++++++++++++++++++++++ internal/tmux/tmux.go | 19 +++ 3 files changed, 291 insertions(+), 21 deletions(-) create mode 100644 internal/daemon/notification.go diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 7c160413..56a204ca 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -22,12 +22,13 @@ import ( // Daemon is the town-level background service. type Daemon struct { - config *Config - tmux *tmux.Tmux - logger *log.Logger - ctx context.Context - cancel context.CancelFunc - backoff *BackoffManager + config *Config + tmux *tmux.Tmux + logger *log.Logger + ctx context.Context + cancel context.CancelFunc + backoff *BackoffManager + notifications *NotificationManager } // New creates a new daemon instance. @@ -47,13 +48,18 @@ func New(config *Config) (*Daemon, error) { logger := log.New(logFile, "", log.LstdFlags) ctx, cancel := context.WithCancel(context.Background()) + // Initialize notification manager for slot-based deduplication + notifDir := filepath.Join(daemonDir, "notifications") + notifMaxAge := 5 * time.Minute // Notifications expire after 5 minutes + return &Daemon{ - config: config, - tmux: tmux.NewTmux(), - logger: logger, - ctx: ctx, - cancel: cancel, - backoff: NewBackoffManager(DefaultBackoffConfig()), + config: config, + tmux: tmux.NewTmux(), + logger: logger, + ctx: ctx, + cancel: cancel, + backoff: NewBackoffManager(DefaultBackoffConfig()), + notifications: NewNotificationManager(notifDir, notifMaxAge), }, nil } @@ -110,6 +116,9 @@ func (d *Daemon) Run() error { func (d *Daemon) heartbeat(state *State) { d.logger.Println("Heartbeat starting") + // 0. Clean up stale notification slots periodically + _ = d.notifications.ClearStaleSlots() + // 1. Ensure Deacon is running (the Deacon is the heartbeat of the system) d.ensureDeaconRunning() @@ -205,8 +214,9 @@ func (d *Daemon) pokeDeacon() { } if isFresh { - // Deacon is actively working, reset backoff + // Deacon is actively working, reset backoff and mark notifications consumed d.backoff.RecordActivity(agentID) + _ = d.notifications.MarkConsumed(DeaconSessionName, SlotHeartbeat) d.logger.Println("Deacon is fresh, skipping poke") return } @@ -218,13 +228,22 @@ func (d *Daemon) pokeDeacon() { return } - // Send heartbeat message via tmux + // Check if we should send (slot-based deduplication) + shouldSend, _ := d.notifications.ShouldSend(DeaconSessionName, SlotHeartbeat) + if !shouldSend { + d.logger.Println("Heartbeat already pending for Deacon, skipping") + return + } + + // Send heartbeat message via tmux, replacing any pending input msg := "HEARTBEAT: run your rounds" - if err := d.tmux.SendKeys(DeaconSessionName, msg); err != nil { + if err := d.tmux.SendKeysReplace(DeaconSessionName, msg, 50); err != nil { d.logger.Printf("Error poking Deacon: %v", err) return } + // Record the send for slot deduplication + _ = d.notifications.RecordSend(DeaconSessionName, SlotHeartbeat, msg) d.backoff.RecordPoke(agentID) // Adjust backoff based on staleness @@ -258,8 +277,9 @@ func (d *Daemon) pokeMayor() { // Check keepalive to see if agent is active state := keepalive.Read(d.config.TownRoot) if state != nil && state.IsFresh() { - // Agent is actively working, reset backoff + // Agent is actively working, reset backoff and mark notifications consumed d.backoff.RecordActivity(agentID) + _ = d.notifications.MarkConsumed(mayorSession, SlotHeartbeat) d.logger.Printf("Mayor is fresh (cmd: %s), skipping poke", state.LastCommand) return } @@ -271,13 +291,22 @@ func (d *Daemon) pokeMayor() { return } - // Send heartbeat message via tmux + // Check if we should send (slot-based deduplication) + shouldSend, _ := d.notifications.ShouldSend(mayorSession, SlotHeartbeat) + if !shouldSend { + d.logger.Println("Heartbeat already pending for Mayor, skipping") + return + } + + // Send heartbeat message via tmux, replacing any pending input msg := "HEARTBEAT: check your rigs" - if err := d.tmux.SendKeys(mayorSession, msg); err != nil { + if err := d.tmux.SendKeysReplace(mayorSession, msg, 50); err != nil { d.logger.Printf("Error poking Mayor: %v", err) return } + // Record the send for slot deduplication + _ = d.notifications.RecordSend(mayorSession, SlotHeartbeat, msg) d.backoff.RecordPoke(agentID) // If agent is stale or very stale, record a miss (increase backoff) @@ -399,8 +428,9 @@ func (d *Daemon) pokeWitness(session string) { // Check keepalive to see if the witness is active state := keepalive.Read(rigWorkspace) if state != nil && state.IsFresh() { - // Witness is actively working, reset backoff + // Witness is actively working, reset backoff and mark notifications consumed d.backoff.RecordActivity(agentID) + _ = d.notifications.MarkConsumed(session, SlotHeartbeat) d.logger.Printf("Witness %s is fresh (cmd: %s), skipping poke", session, state.LastCommand) return } @@ -412,13 +442,22 @@ func (d *Daemon) pokeWitness(session string) { return } - // Send heartbeat message + // Check if we should send (slot-based deduplication) + shouldSend, _ := d.notifications.ShouldSend(session, SlotHeartbeat) + if !shouldSend { + d.logger.Printf("Heartbeat already pending for Witness %s, skipping", session) + return + } + + // Send heartbeat message, replacing any pending input msg := "HEARTBEAT: check your workers" - if err := d.tmux.SendKeys(session, msg); err != nil { + if err := d.tmux.SendKeysReplace(session, msg, 50); err != nil { d.logger.Printf("Error poking Witness %s: %v", session, err) return } + // Record the send for slot deduplication + _ = d.notifications.RecordSend(session, SlotHeartbeat, msg) d.backoff.RecordPoke(agentID) // If agent is stale or very stale, record a miss (increase backoff) diff --git a/internal/daemon/notification.go b/internal/daemon/notification.go new file mode 100644 index 00000000..a3f4feac --- /dev/null +++ b/internal/daemon/notification.go @@ -0,0 +1,212 @@ +package daemon + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" +) + +// NotificationSlot tracks a pending notification for deduplication. +// Only the latest notification per slot matters - earlier ones are replaced. +type NotificationSlot struct { + Slot string `json:"slot"` + Session string `json:"session"` + Message string `json:"message"` + SentAt time.Time `json:"sent_at"` + Consumed bool `json:"consumed"` + ConsumedAt time.Time `json:"consumed_at,omitempty"` +} + +// NotificationManager handles slot-based notification deduplication. +// It ensures that for a given (session, slot) pair, only one notification +// is pending at a time. Sending a new notification to the same slot +// replaces the previous one. +type NotificationManager struct { + stateDir string // Directory for slot state files + maxAge time.Duration // Max age before considering a slot stale +} + +// NewNotificationManager creates a new notification manager. +// stateDir is where slot state files are stored (e.g., ~/gt/daemon/notifications/) +func NewNotificationManager(stateDir string, maxAge time.Duration) *NotificationManager { + return &NotificationManager{ + stateDir: stateDir, + maxAge: maxAge, + } +} + +// slotPath returns the path to the slot state file. +func (m *NotificationManager) slotPath(session, slot string) string { + // Sanitize session name (replace / with -) + safeSession := session + for i := range safeSession { + if safeSession[i] == '/' { + safeSession = safeSession[:i] + "-" + safeSession[i+1:] + } + } + return filepath.Join(m.stateDir, fmt.Sprintf("slot-%s-%s.json", safeSession, slot)) +} + +// GetSlot reads the current state of a notification slot. +func (m *NotificationManager) GetSlot(session, slot string) (*NotificationSlot, error) { + path := m.slotPath(session, slot) + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil // No slot state + } + return nil, err + } + + var ns NotificationSlot + if err := json.Unmarshal(data, &ns); err != nil { + return nil, err + } + + return &ns, nil +} + +// ShouldSend checks if a notification should be sent for this slot. +// Returns true if: +// - No pending notification exists for this slot +// - The pending notification is stale (older than maxAge) +// - The pending notification was consumed +func (m *NotificationManager) ShouldSend(session, slot string) (bool, error) { + ns, err := m.GetSlot(session, slot) + if err != nil { + return true, err // On error, allow sending + } + + if ns == nil { + return true, nil // No pending notification + } + + if ns.Consumed { + return true, nil // Previous was consumed + } + + // Check if stale + if time.Since(ns.SentAt) > m.maxAge { + return true, nil // Stale, allow new send + } + + return false, nil // Recent pending notification exists +} + +// RecordSend records that a notification was sent for a slot. +func (m *NotificationManager) RecordSend(session, slot, message string) error { + // Ensure directory exists + if err := os.MkdirAll(m.stateDir, 0755); err != nil { + return err + } + + ns := &NotificationSlot{ + Slot: slot, + Session: session, + Message: message, + SentAt: time.Now(), + Consumed: false, + } + + data, err := json.Marshal(ns) + if err != nil { + return err + } + + return os.WriteFile(m.slotPath(session, slot), data, 0644) +} + +// MarkConsumed marks a slot's notification as consumed (agent responded). +func (m *NotificationManager) MarkConsumed(session, slot string) error { + ns, err := m.GetSlot(session, slot) + if err != nil { + return err + } + + if ns == nil { + return nil // Nothing to mark + } + + ns.Consumed = true + ns.ConsumedAt = time.Now() + + data, err := json.Marshal(ns) + if err != nil { + return err + } + + return os.WriteFile(m.slotPath(session, slot), data, 0644) +} + +// MarkSessionActive marks all slots for a session as consumed. +// Call this when the session shows activity (keepalive update). +func (m *NotificationManager) MarkSessionActive(session string) error { + // List all slot files for this session + pattern := filepath.Join(m.stateDir, fmt.Sprintf("slot-%s-*.json", session)) + matches, err := filepath.Glob(pattern) + if err != nil { + return err + } + + for _, path := range matches { + data, err := os.ReadFile(path) + if err != nil { + continue + } + + var ns NotificationSlot + if err := json.Unmarshal(data, &ns); err != nil { + continue + } + + if !ns.Consumed { + ns.Consumed = true + ns.ConsumedAt = time.Now() + if data, err := json.Marshal(&ns); err == nil { + _ = os.WriteFile(path, data, 0644) + } + } + } + + return nil +} + +// ClearSlot removes the state file for a slot. +func (m *NotificationManager) ClearSlot(session, slot string) error { + path := m.slotPath(session, slot) + err := os.Remove(path) + if os.IsNotExist(err) { + return nil + } + return err +} + +// ClearStaleSlots removes slot files older than maxAge. +func (m *NotificationManager) ClearStaleSlots() error { + pattern := filepath.Join(m.stateDir, "slot-*.json") + matches, err := filepath.Glob(pattern) + if err != nil { + return err + } + + for _, path := range matches { + info, err := os.Stat(path) + if err != nil { + continue + } + + if time.Since(info.ModTime()) > m.maxAge { + _ = os.Remove(path) + } + } + + return nil +} + +// Common notification slots +const ( + SlotHeartbeat = "heartbeat" + SlotStatus = "status" +) diff --git a/internal/tmux/tmux.go b/internal/tmux/tmux.go index 65eb0c09..d546cb42 100644 --- a/internal/tmux/tmux.go +++ b/internal/tmux/tmux.go @@ -138,6 +138,25 @@ func (t *Tmux) SendKeysRaw(session, keys string) error { return err } +// SendKeysReplace sends keystrokes, clearing any pending input first. +// This is useful for "replaceable" notifications where only the latest matters. +// Uses Ctrl-U to clear the input line before sending the new message. +// The delay parameter controls how long to wait after clearing before sending (ms). +func (t *Tmux) SendKeysReplace(session, keys string, clearDelayMs int) error { + // Send Ctrl-U to clear any pending input on the line + if _, err := t.run("send-keys", "-t", session, "C-u"); err != nil { + return err + } + + // Small delay to let the clear take effect + if clearDelayMs > 0 { + time.Sleep(time.Duration(clearDelayMs) * time.Millisecond) + } + + // Now send the actual message + return t.SendKeys(session, keys) +} + // SendKeysDelayed sends keystrokes after a delay (in milliseconds). // Useful for waiting for a process to be ready before sending input. func (t *Tmux) SendKeysDelayed(session, keys string, delayMs int) error {