feat(daemon): Add slot-based notification deduplication (gt-wpg)
Implement replaceable notifications to prevent heartbeat stacking when agents are busy. Only the latest notification per slot is delivered. Changes: - Add NotificationManager for tracking pending notifications - Add SendKeysReplace() that clears input line before sending - Integrate slot tracking into daemon heartbeat pokes - Mark notifications consumed when agent shows activity The system tracks pending notifications in state files and skips sending if a notification for the same slot is still pending. When agent activity is detected (keepalive), slots are marked consumed allowing new notifications to be sent. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -22,12 +22,13 @@ import (
|
||||
|
||||
// Daemon is the town-level background service.
|
||||
type Daemon struct {
|
||||
config *Config
|
||||
tmux *tmux.Tmux
|
||||
logger *log.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
backoff *BackoffManager
|
||||
config *Config
|
||||
tmux *tmux.Tmux
|
||||
logger *log.Logger
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
backoff *BackoffManager
|
||||
notifications *NotificationManager
|
||||
}
|
||||
|
||||
// New creates a new daemon instance.
|
||||
@@ -47,13 +48,18 @@ func New(config *Config) (*Daemon, error) {
|
||||
logger := log.New(logFile, "", log.LstdFlags)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Initialize notification manager for slot-based deduplication
|
||||
notifDir := filepath.Join(daemonDir, "notifications")
|
||||
notifMaxAge := 5 * time.Minute // Notifications expire after 5 minutes
|
||||
|
||||
return &Daemon{
|
||||
config: config,
|
||||
tmux: tmux.NewTmux(),
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
backoff: NewBackoffManager(DefaultBackoffConfig()),
|
||||
config: config,
|
||||
tmux: tmux.NewTmux(),
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
backoff: NewBackoffManager(DefaultBackoffConfig()),
|
||||
notifications: NewNotificationManager(notifDir, notifMaxAge),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -110,6 +116,9 @@ func (d *Daemon) Run() error {
|
||||
func (d *Daemon) heartbeat(state *State) {
|
||||
d.logger.Println("Heartbeat starting")
|
||||
|
||||
// 0. Clean up stale notification slots periodically
|
||||
_ = d.notifications.ClearStaleSlots()
|
||||
|
||||
// 1. Ensure Deacon is running (the Deacon is the heartbeat of the system)
|
||||
d.ensureDeaconRunning()
|
||||
|
||||
@@ -205,8 +214,9 @@ func (d *Daemon) pokeDeacon() {
|
||||
}
|
||||
|
||||
if isFresh {
|
||||
// Deacon is actively working, reset backoff
|
||||
// Deacon is actively working, reset backoff and mark notifications consumed
|
||||
d.backoff.RecordActivity(agentID)
|
||||
_ = d.notifications.MarkConsumed(DeaconSessionName, SlotHeartbeat)
|
||||
d.logger.Println("Deacon is fresh, skipping poke")
|
||||
return
|
||||
}
|
||||
@@ -218,13 +228,22 @@ func (d *Daemon) pokeDeacon() {
|
||||
return
|
||||
}
|
||||
|
||||
// Send heartbeat message via tmux
|
||||
// Check if we should send (slot-based deduplication)
|
||||
shouldSend, _ := d.notifications.ShouldSend(DeaconSessionName, SlotHeartbeat)
|
||||
if !shouldSend {
|
||||
d.logger.Println("Heartbeat already pending for Deacon, skipping")
|
||||
return
|
||||
}
|
||||
|
||||
// Send heartbeat message via tmux, replacing any pending input
|
||||
msg := "HEARTBEAT: run your rounds"
|
||||
if err := d.tmux.SendKeys(DeaconSessionName, msg); err != nil {
|
||||
if err := d.tmux.SendKeysReplace(DeaconSessionName, msg, 50); err != nil {
|
||||
d.logger.Printf("Error poking Deacon: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Record the send for slot deduplication
|
||||
_ = d.notifications.RecordSend(DeaconSessionName, SlotHeartbeat, msg)
|
||||
d.backoff.RecordPoke(agentID)
|
||||
|
||||
// Adjust backoff based on staleness
|
||||
@@ -258,8 +277,9 @@ func (d *Daemon) pokeMayor() {
|
||||
// Check keepalive to see if agent is active
|
||||
state := keepalive.Read(d.config.TownRoot)
|
||||
if state != nil && state.IsFresh() {
|
||||
// Agent is actively working, reset backoff
|
||||
// Agent is actively working, reset backoff and mark notifications consumed
|
||||
d.backoff.RecordActivity(agentID)
|
||||
_ = d.notifications.MarkConsumed(mayorSession, SlotHeartbeat)
|
||||
d.logger.Printf("Mayor is fresh (cmd: %s), skipping poke", state.LastCommand)
|
||||
return
|
||||
}
|
||||
@@ -271,13 +291,22 @@ func (d *Daemon) pokeMayor() {
|
||||
return
|
||||
}
|
||||
|
||||
// Send heartbeat message via tmux
|
||||
// Check if we should send (slot-based deduplication)
|
||||
shouldSend, _ := d.notifications.ShouldSend(mayorSession, SlotHeartbeat)
|
||||
if !shouldSend {
|
||||
d.logger.Println("Heartbeat already pending for Mayor, skipping")
|
||||
return
|
||||
}
|
||||
|
||||
// Send heartbeat message via tmux, replacing any pending input
|
||||
msg := "HEARTBEAT: check your rigs"
|
||||
if err := d.tmux.SendKeys(mayorSession, msg); err != nil {
|
||||
if err := d.tmux.SendKeysReplace(mayorSession, msg, 50); err != nil {
|
||||
d.logger.Printf("Error poking Mayor: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Record the send for slot deduplication
|
||||
_ = d.notifications.RecordSend(mayorSession, SlotHeartbeat, msg)
|
||||
d.backoff.RecordPoke(agentID)
|
||||
|
||||
// If agent is stale or very stale, record a miss (increase backoff)
|
||||
@@ -399,8 +428,9 @@ func (d *Daemon) pokeWitness(session string) {
|
||||
// Check keepalive to see if the witness is active
|
||||
state := keepalive.Read(rigWorkspace)
|
||||
if state != nil && state.IsFresh() {
|
||||
// Witness is actively working, reset backoff
|
||||
// Witness is actively working, reset backoff and mark notifications consumed
|
||||
d.backoff.RecordActivity(agentID)
|
||||
_ = d.notifications.MarkConsumed(session, SlotHeartbeat)
|
||||
d.logger.Printf("Witness %s is fresh (cmd: %s), skipping poke", session, state.LastCommand)
|
||||
return
|
||||
}
|
||||
@@ -412,13 +442,22 @@ func (d *Daemon) pokeWitness(session string) {
|
||||
return
|
||||
}
|
||||
|
||||
// Send heartbeat message
|
||||
// Check if we should send (slot-based deduplication)
|
||||
shouldSend, _ := d.notifications.ShouldSend(session, SlotHeartbeat)
|
||||
if !shouldSend {
|
||||
d.logger.Printf("Heartbeat already pending for Witness %s, skipping", session)
|
||||
return
|
||||
}
|
||||
|
||||
// Send heartbeat message, replacing any pending input
|
||||
msg := "HEARTBEAT: check your workers"
|
||||
if err := d.tmux.SendKeys(session, msg); err != nil {
|
||||
if err := d.tmux.SendKeysReplace(session, msg, 50); err != nil {
|
||||
d.logger.Printf("Error poking Witness %s: %v", session, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Record the send for slot deduplication
|
||||
_ = d.notifications.RecordSend(session, SlotHeartbeat, msg)
|
||||
d.backoff.RecordPoke(agentID)
|
||||
|
||||
// If agent is stale or very stale, record a miss (increase backoff)
|
||||
|
||||
212
internal/daemon/notification.go
Normal file
212
internal/daemon/notification.go
Normal file
@@ -0,0 +1,212 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NotificationSlot tracks a pending notification for deduplication.
|
||||
// Only the latest notification per slot matters - earlier ones are replaced.
|
||||
type NotificationSlot struct {
|
||||
Slot string `json:"slot"`
|
||||
Session string `json:"session"`
|
||||
Message string `json:"message"`
|
||||
SentAt time.Time `json:"sent_at"`
|
||||
Consumed bool `json:"consumed"`
|
||||
ConsumedAt time.Time `json:"consumed_at,omitempty"`
|
||||
}
|
||||
|
||||
// NotificationManager handles slot-based notification deduplication.
|
||||
// It ensures that for a given (session, slot) pair, only one notification
|
||||
// is pending at a time. Sending a new notification to the same slot
|
||||
// replaces the previous one.
|
||||
type NotificationManager struct {
|
||||
stateDir string // Directory for slot state files
|
||||
maxAge time.Duration // Max age before considering a slot stale
|
||||
}
|
||||
|
||||
// NewNotificationManager creates a new notification manager.
|
||||
// stateDir is where slot state files are stored (e.g., ~/gt/daemon/notifications/)
|
||||
func NewNotificationManager(stateDir string, maxAge time.Duration) *NotificationManager {
|
||||
return &NotificationManager{
|
||||
stateDir: stateDir,
|
||||
maxAge: maxAge,
|
||||
}
|
||||
}
|
||||
|
||||
// slotPath returns the path to the slot state file.
|
||||
func (m *NotificationManager) slotPath(session, slot string) string {
|
||||
// Sanitize session name (replace / with -)
|
||||
safeSession := session
|
||||
for i := range safeSession {
|
||||
if safeSession[i] == '/' {
|
||||
safeSession = safeSession[:i] + "-" + safeSession[i+1:]
|
||||
}
|
||||
}
|
||||
return filepath.Join(m.stateDir, fmt.Sprintf("slot-%s-%s.json", safeSession, slot))
|
||||
}
|
||||
|
||||
// GetSlot reads the current state of a notification slot.
|
||||
func (m *NotificationManager) GetSlot(session, slot string) (*NotificationSlot, error) {
|
||||
path := m.slotPath(session, slot)
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil // No slot state
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ns NotificationSlot
|
||||
if err := json.Unmarshal(data, &ns); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ns, nil
|
||||
}
|
||||
|
||||
// ShouldSend checks if a notification should be sent for this slot.
|
||||
// Returns true if:
|
||||
// - No pending notification exists for this slot
|
||||
// - The pending notification is stale (older than maxAge)
|
||||
// - The pending notification was consumed
|
||||
func (m *NotificationManager) ShouldSend(session, slot string) (bool, error) {
|
||||
ns, err := m.GetSlot(session, slot)
|
||||
if err != nil {
|
||||
return true, err // On error, allow sending
|
||||
}
|
||||
|
||||
if ns == nil {
|
||||
return true, nil // No pending notification
|
||||
}
|
||||
|
||||
if ns.Consumed {
|
||||
return true, nil // Previous was consumed
|
||||
}
|
||||
|
||||
// Check if stale
|
||||
if time.Since(ns.SentAt) > m.maxAge {
|
||||
return true, nil // Stale, allow new send
|
||||
}
|
||||
|
||||
return false, nil // Recent pending notification exists
|
||||
}
|
||||
|
||||
// RecordSend records that a notification was sent for a slot.
|
||||
func (m *NotificationManager) RecordSend(session, slot, message string) error {
|
||||
// Ensure directory exists
|
||||
if err := os.MkdirAll(m.stateDir, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ns := &NotificationSlot{
|
||||
Slot: slot,
|
||||
Session: session,
|
||||
Message: message,
|
||||
SentAt: time.Now(),
|
||||
Consumed: false,
|
||||
}
|
||||
|
||||
data, err := json.Marshal(ns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.WriteFile(m.slotPath(session, slot), data, 0644)
|
||||
}
|
||||
|
||||
// MarkConsumed marks a slot's notification as consumed (agent responded).
|
||||
func (m *NotificationManager) MarkConsumed(session, slot string) error {
|
||||
ns, err := m.GetSlot(session, slot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ns == nil {
|
||||
return nil // Nothing to mark
|
||||
}
|
||||
|
||||
ns.Consumed = true
|
||||
ns.ConsumedAt = time.Now()
|
||||
|
||||
data, err := json.Marshal(ns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return os.WriteFile(m.slotPath(session, slot), data, 0644)
|
||||
}
|
||||
|
||||
// MarkSessionActive marks all slots for a session as consumed.
|
||||
// Call this when the session shows activity (keepalive update).
|
||||
func (m *NotificationManager) MarkSessionActive(session string) error {
|
||||
// List all slot files for this session
|
||||
pattern := filepath.Join(m.stateDir, fmt.Sprintf("slot-%s-*.json", session))
|
||||
matches, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, path := range matches {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var ns NotificationSlot
|
||||
if err := json.Unmarshal(data, &ns); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if !ns.Consumed {
|
||||
ns.Consumed = true
|
||||
ns.ConsumedAt = time.Now()
|
||||
if data, err := json.Marshal(&ns); err == nil {
|
||||
_ = os.WriteFile(path, data, 0644)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClearSlot removes the state file for a slot.
|
||||
func (m *NotificationManager) ClearSlot(session, slot string) error {
|
||||
path := m.slotPath(session, slot)
|
||||
err := os.Remove(path)
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ClearStaleSlots removes slot files older than maxAge.
|
||||
func (m *NotificationManager) ClearStaleSlots() error {
|
||||
pattern := filepath.Join(m.stateDir, "slot-*.json")
|
||||
matches, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, path := range matches {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Since(info.ModTime()) > m.maxAge {
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Common notification slots
|
||||
const (
|
||||
SlotHeartbeat = "heartbeat"
|
||||
SlotStatus = "status"
|
||||
)
|
||||
@@ -138,6 +138,25 @@ func (t *Tmux) SendKeysRaw(session, keys string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// SendKeysReplace sends keystrokes, clearing any pending input first.
|
||||
// This is useful for "replaceable" notifications where only the latest matters.
|
||||
// Uses Ctrl-U to clear the input line before sending the new message.
|
||||
// The delay parameter controls how long to wait after clearing before sending (ms).
|
||||
func (t *Tmux) SendKeysReplace(session, keys string, clearDelayMs int) error {
|
||||
// Send Ctrl-U to clear any pending input on the line
|
||||
if _, err := t.run("send-keys", "-t", session, "C-u"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Small delay to let the clear take effect
|
||||
if clearDelayMs > 0 {
|
||||
time.Sleep(time.Duration(clearDelayMs) * time.Millisecond)
|
||||
}
|
||||
|
||||
// Now send the actual message
|
||||
return t.SendKeys(session, keys)
|
||||
}
|
||||
|
||||
// SendKeysDelayed sends keystrokes after a delay (in milliseconds).
|
||||
// Useful for waiting for a process to be ready before sending input.
|
||||
func (t *Tmux) SendKeysDelayed(session, keys string, delayMs int) error {
|
||||
|
||||
Reference in New Issue
Block a user