From 3a477f673ccfa7ef8f369ef24a7053826d283931 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Fri, 19 Dec 2025 12:09:26 -0800 Subject: [PATCH] feat(daemon): add adaptive backoff for heartbeat pokes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements per-agent backoff tracking to reduce noise for busy agents: - AgentBackoff type tracks interval, miss count, and last activity - BackoffManager manages state across all agents - Geometric backoff strategy (1.5x factor, 10min cap) - Integrates with keepalive to skip pokes when agents are fresh - Resets backoff immediately when activity detected Closes gt-8bx 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/daemon/backoff.go | 187 ++++++++++++++++++++ internal/daemon/backoff_test.go | 290 ++++++++++++++++++++++++++++++++ internal/daemon/daemon.go | 118 +++++++++++-- 3 files changed, 577 insertions(+), 18 deletions(-) create mode 100644 internal/daemon/backoff.go create mode 100644 internal/daemon/backoff_test.go diff --git a/internal/daemon/backoff.go b/internal/daemon/backoff.go new file mode 100644 index 00000000..abc43ac6 --- /dev/null +++ b/internal/daemon/backoff.go @@ -0,0 +1,187 @@ +package daemon + +import ( + "time" +) + +// BackoffStrategy defines how intervals grow. +type BackoffStrategy string + +const ( + // StrategyFixed keeps the same interval (no backoff). + StrategyFixed BackoffStrategy = "fixed" + + // StrategyGeometric multiplies by a factor each miss (1.5x). + StrategyGeometric BackoffStrategy = "geometric" + + // StrategyExponential doubles interval each miss (2x). + StrategyExponential BackoffStrategy = "exponential" +) + +// BackoffConfig holds backoff configuration. +type BackoffConfig struct { + // Strategy determines how intervals grow. + Strategy BackoffStrategy + + // BaseInterval is the starting interval (default 60s). + BaseInterval time.Duration + + // MaxInterval is the cap on how large intervals can grow (default 10m). + MaxInterval time.Duration + + // Factor is the multiplier for geometric backoff (default 1.5). + Factor float64 +} + +// DefaultBackoffConfig returns sensible defaults. +func DefaultBackoffConfig() *BackoffConfig { + return &BackoffConfig{ + Strategy: StrategyGeometric, + BaseInterval: 60 * time.Second, + MaxInterval: 10 * time.Minute, + Factor: 1.5, + } +} + +// AgentBackoff tracks backoff state for a single agent. +type AgentBackoff struct { + // AgentID identifies the agent (e.g., "mayor", "gastown-witness"). + AgentID string + + // BaseInterval is the starting interval. + BaseInterval time.Duration + + // CurrentInterval is the current (possibly backed-off) interval. + CurrentInterval time.Duration + + // MaxInterval caps how large intervals can grow. + MaxInterval time.Duration + + // ConsecutiveMiss counts pokes with no response. + ConsecutiveMiss int + + // LastPoke is when we last poked this agent. + LastPoke time.Time + + // LastActivity is when the agent last showed activity. + LastActivity time.Time +} + +// NewAgentBackoff creates backoff state for an agent. +func NewAgentBackoff(agentID string, config *BackoffConfig) *AgentBackoff { + if config == nil { + config = DefaultBackoffConfig() + } + return &AgentBackoff{ + AgentID: agentID, + BaseInterval: config.BaseInterval, + CurrentInterval: config.BaseInterval, + MaxInterval: config.MaxInterval, + } +} + +// ShouldPoke returns true if enough time has passed since the last poke. +func (ab *AgentBackoff) ShouldPoke() bool { + if ab.LastPoke.IsZero() { + return true // Never poked + } + return time.Since(ab.LastPoke) >= ab.CurrentInterval +} + +// RecordPoke records that we poked the agent. +func (ab *AgentBackoff) RecordPoke() { + ab.LastPoke = time.Now() +} + +// RecordMiss records that the agent didn't respond since last poke. +// This increases the backoff interval. +func (ab *AgentBackoff) RecordMiss(config *BackoffConfig) { + ab.ConsecutiveMiss++ + + if config == nil { + config = DefaultBackoffConfig() + } + + switch config.Strategy { + case StrategyFixed: + // No change + case StrategyGeometric: + ab.CurrentInterval = time.Duration(float64(ab.CurrentInterval) * config.Factor) + case StrategyExponential: + ab.CurrentInterval = ab.CurrentInterval * 2 + } + + // Cap at max interval + if ab.CurrentInterval > ab.MaxInterval { + ab.CurrentInterval = ab.MaxInterval + } +} + +// RecordActivity records that the agent showed activity. +// This resets the backoff to the base interval. +func (ab *AgentBackoff) RecordActivity() { + ab.ConsecutiveMiss = 0 + ab.CurrentInterval = ab.BaseInterval + ab.LastActivity = time.Now() +} + +// BackoffManager tracks backoff state for all agents. +type BackoffManager struct { + config *BackoffConfig + agents map[string]*AgentBackoff +} + +// NewBackoffManager creates a new backoff manager. +func NewBackoffManager(config *BackoffConfig) *BackoffManager { + if config == nil { + config = DefaultBackoffConfig() + } + return &BackoffManager{ + config: config, + agents: make(map[string]*AgentBackoff), + } +} + +// GetOrCreate returns backoff state for an agent, creating if needed. +func (bm *BackoffManager) GetOrCreate(agentID string) *AgentBackoff { + if ab, ok := bm.agents[agentID]; ok { + return ab + } + ab := NewAgentBackoff(agentID, bm.config) + bm.agents[agentID] = ab + return ab +} + +// ShouldPoke returns true if we should poke the given agent. +func (bm *BackoffManager) ShouldPoke(agentID string) bool { + return bm.GetOrCreate(agentID).ShouldPoke() +} + +// RecordPoke records that we poked an agent. +func (bm *BackoffManager) RecordPoke(agentID string) { + bm.GetOrCreate(agentID).RecordPoke() +} + +// RecordMiss records that an agent didn't respond. +func (bm *BackoffManager) RecordMiss(agentID string) { + bm.GetOrCreate(agentID).RecordMiss(bm.config) +} + +// RecordActivity records that an agent showed activity. +func (bm *BackoffManager) RecordActivity(agentID string) { + bm.GetOrCreate(agentID).RecordActivity() +} + +// GetInterval returns the current interval for an agent. +func (bm *BackoffManager) GetInterval(agentID string) time.Duration { + return bm.GetOrCreate(agentID).CurrentInterval +} + +// Stats returns a map of agent ID to current interval for logging. +func (bm *BackoffManager) Stats() map[string]time.Duration { + stats := make(map[string]time.Duration, len(bm.agents)) + for id, ab := range bm.agents { + stats[id] = ab.CurrentInterval + } + return stats +} diff --git a/internal/daemon/backoff_test.go b/internal/daemon/backoff_test.go new file mode 100644 index 00000000..3af1f82d --- /dev/null +++ b/internal/daemon/backoff_test.go @@ -0,0 +1,290 @@ +package daemon + +import ( + "testing" + "time" +) + +func TestDefaultBackoffConfig(t *testing.T) { + config := DefaultBackoffConfig() + + if config.Strategy != StrategyGeometric { + t.Errorf("expected strategy Geometric, got %v", config.Strategy) + } + if config.BaseInterval != 60*time.Second { + t.Errorf("expected base interval 60s, got %v", config.BaseInterval) + } + if config.MaxInterval != 10*time.Minute { + t.Errorf("expected max interval 10m, got %v", config.MaxInterval) + } + if config.Factor != 1.5 { + t.Errorf("expected factor 1.5, got %v", config.Factor) + } +} + +func TestNewAgentBackoff(t *testing.T) { + config := DefaultBackoffConfig() + ab := NewAgentBackoff("test-agent", config) + + if ab.AgentID != "test-agent" { + t.Errorf("expected agent ID 'test-agent', got %s", ab.AgentID) + } + if ab.BaseInterval != 60*time.Second { + t.Errorf("expected base interval 60s, got %v", ab.BaseInterval) + } + if ab.CurrentInterval != 60*time.Second { + t.Errorf("expected current interval 60s, got %v", ab.CurrentInterval) + } + if ab.ConsecutiveMiss != 0 { + t.Errorf("expected consecutive miss 0, got %d", ab.ConsecutiveMiss) + } +} + +func TestAgentBackoff_ShouldPoke(t *testing.T) { + config := &BackoffConfig{ + Strategy: StrategyGeometric, + BaseInterval: 100 * time.Millisecond, // Short for testing + MaxInterval: 1 * time.Second, + Factor: 1.5, + } + ab := NewAgentBackoff("test", config) + + // Should poke immediately (never poked) + if !ab.ShouldPoke() { + t.Error("expected ShouldPoke=true for new agent") + } + + // Record a poke + ab.RecordPoke() + + // Should not poke immediately after + if ab.ShouldPoke() { + t.Error("expected ShouldPoke=false immediately after poke") + } + + // Wait for interval + time.Sleep(110 * time.Millisecond) + + // Now should poke again + if !ab.ShouldPoke() { + t.Error("expected ShouldPoke=true after interval elapsed") + } +} + +func TestAgentBackoff_GeometricBackoff(t *testing.T) { + config := &BackoffConfig{ + Strategy: StrategyGeometric, + BaseInterval: 100 * time.Millisecond, + MaxInterval: 1 * time.Second, + Factor: 1.5, + } + ab := NewAgentBackoff("test", config) + + // Initial interval + if ab.CurrentInterval != 100*time.Millisecond { + t.Errorf("expected initial interval 100ms, got %v", ab.CurrentInterval) + } + + // First miss: 100ms * 1.5 = 150ms + ab.RecordMiss(config) + if ab.CurrentInterval != 150*time.Millisecond { + t.Errorf("expected interval 150ms after 1 miss, got %v", ab.CurrentInterval) + } + if ab.ConsecutiveMiss != 1 { + t.Errorf("expected consecutive miss 1, got %d", ab.ConsecutiveMiss) + } + + // Second miss: 150ms * 1.5 = 225ms + ab.RecordMiss(config) + if ab.CurrentInterval != 225*time.Millisecond { + t.Errorf("expected interval 225ms after 2 misses, got %v", ab.CurrentInterval) + } + + // Third miss: 225ms * 1.5 = 337.5ms + ab.RecordMiss(config) + expected := time.Duration(337500000) // 337.5ms in nanoseconds + if ab.CurrentInterval != expected { + t.Errorf("expected interval ~337.5ms after 3 misses, got %v", ab.CurrentInterval) + } +} + +func TestAgentBackoff_ExponentialBackoff(t *testing.T) { + config := &BackoffConfig{ + Strategy: StrategyExponential, + BaseInterval: 100 * time.Millisecond, + MaxInterval: 1 * time.Second, + Factor: 2.0, // Ignored for exponential + } + ab := NewAgentBackoff("test", config) + + // First miss: 100ms * 2 = 200ms + ab.RecordMiss(config) + if ab.CurrentInterval != 200*time.Millisecond { + t.Errorf("expected interval 200ms after 1 miss, got %v", ab.CurrentInterval) + } + + // Second miss: 200ms * 2 = 400ms + ab.RecordMiss(config) + if ab.CurrentInterval != 400*time.Millisecond { + t.Errorf("expected interval 400ms after 2 misses, got %v", ab.CurrentInterval) + } + + // Third miss: 400ms * 2 = 800ms + ab.RecordMiss(config) + if ab.CurrentInterval != 800*time.Millisecond { + t.Errorf("expected interval 800ms after 3 misses, got %v", ab.CurrentInterval) + } +} + +func TestAgentBackoff_FixedStrategy(t *testing.T) { + config := &BackoffConfig{ + Strategy: StrategyFixed, + BaseInterval: 100 * time.Millisecond, + MaxInterval: 1 * time.Second, + Factor: 1.5, + } + ab := NewAgentBackoff("test", config) + + // Multiple misses should not change interval + ab.RecordMiss(config) + ab.RecordMiss(config) + ab.RecordMiss(config) + + if ab.CurrentInterval != 100*time.Millisecond { + t.Errorf("expected interval to stay at 100ms with fixed strategy, got %v", ab.CurrentInterval) + } + if ab.ConsecutiveMiss != 3 { + t.Errorf("expected consecutive miss 3, got %d", ab.ConsecutiveMiss) + } +} + +func TestAgentBackoff_MaxInterval(t *testing.T) { + config := &BackoffConfig{ + Strategy: StrategyExponential, + BaseInterval: 100 * time.Millisecond, + MaxInterval: 500 * time.Millisecond, + Factor: 2.0, + } + ab := NewAgentBackoff("test", config) + + // Keep missing until we hit the cap + for i := 0; i < 10; i++ { + ab.RecordMiss(config) + } + + if ab.CurrentInterval != 500*time.Millisecond { + t.Errorf("expected interval capped at 500ms, got %v", ab.CurrentInterval) + } +} + +func TestAgentBackoff_RecordActivity(t *testing.T) { + config := &BackoffConfig{ + Strategy: StrategyGeometric, + BaseInterval: 100 * time.Millisecond, + MaxInterval: 1 * time.Second, + Factor: 1.5, + } + ab := NewAgentBackoff("test", config) + + // Build up some backoff + ab.RecordMiss(config) + ab.RecordMiss(config) + ab.RecordMiss(config) + + if ab.CurrentInterval == 100*time.Millisecond { + t.Error("expected interval to have increased") + } + if ab.ConsecutiveMiss != 3 { + t.Errorf("expected consecutive miss 3, got %d", ab.ConsecutiveMiss) + } + + // Record activity - should reset + ab.RecordActivity() + + if ab.CurrentInterval != 100*time.Millisecond { + t.Errorf("expected interval reset to 100ms, got %v", ab.CurrentInterval) + } + if ab.ConsecutiveMiss != 0 { + t.Errorf("expected consecutive miss reset to 0, got %d", ab.ConsecutiveMiss) + } + if ab.LastActivity.IsZero() { + t.Error("expected LastActivity to be set") + } +} + +func TestBackoffManager_GetOrCreate(t *testing.T) { + bm := NewBackoffManager(DefaultBackoffConfig()) + + // First call creates + ab1 := bm.GetOrCreate("agent1") + if ab1 == nil { + t.Fatal("expected agent backoff to be created") + } + if ab1.AgentID != "agent1" { + t.Errorf("expected agent ID 'agent1', got %s", ab1.AgentID) + } + + // Second call returns same instance + ab2 := bm.GetOrCreate("agent1") + if ab1 != ab2 { + t.Error("expected same instance on second call") + } + + // Different agent creates new instance + ab3 := bm.GetOrCreate("agent2") + if ab1 == ab3 { + t.Error("expected different instance for different agent") + } +} + +func TestBackoffManager_Stats(t *testing.T) { + config := &BackoffConfig{ + Strategy: StrategyGeometric, + BaseInterval: 100 * time.Millisecond, + MaxInterval: 1 * time.Second, + Factor: 1.5, + } + bm := NewBackoffManager(config) + + // Create some agents with different backoff states + bm.RecordPoke("agent1") + bm.RecordMiss("agent1") + + bm.RecordPoke("agent2") + bm.RecordMiss("agent2") + bm.RecordMiss("agent2") + + stats := bm.Stats() + + if len(stats) != 2 { + t.Errorf("expected 2 agents in stats, got %d", len(stats)) + } + + // agent1: 100ms * 1.5 = 150ms + if stats["agent1"] != 150*time.Millisecond { + t.Errorf("expected agent1 interval 150ms, got %v", stats["agent1"]) + } + + // agent2: 100ms * 1.5 * 1.5 = 225ms + if stats["agent2"] != 225*time.Millisecond { + t.Errorf("expected agent2 interval 225ms, got %v", stats["agent2"]) + } +} + +func TestExtractRigName(t *testing.T) { + tests := []struct { + session string + expected string + }{ + {"gt-gastown-witness", "gastown"}, + {"gt-myrig-witness", "myrig"}, + {"gt-my-rig-name-witness", "my-rig-name"}, + } + + for _, tc := range tests { + result := extractRigName(tc.session) + if result != tc.expected { + t.Errorf("extractRigName(%q) = %q, expected %q", tc.session, result, tc.expected) + } + } +} diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 7db2e662..d648b8f9 100644 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -8,19 +8,22 @@ import ( "os/signal" "path/filepath" "strconv" + "strings" "syscall" "time" + "github.com/steveyegge/gastown/internal/keepalive" "github.com/steveyegge/gastown/internal/tmux" ) // Daemon is the town-level background service. type Daemon struct { - config *Config - tmux *tmux.Tmux - logger *log.Logger - ctx context.Context - cancel context.CancelFunc + config *Config + tmux *tmux.Tmux + logger *log.Logger + ctx context.Context + cancel context.CancelFunc + backoff *BackoffManager } // New creates a new daemon instance. @@ -41,11 +44,12 @@ func New(config *Config) (*Daemon, error) { ctx, cancel := context.WithCancel(context.Background()) return &Daemon{ - config: config, - tmux: tmux.NewTmux(), - logger: logger, - ctx: ctx, - cancel: cancel, + config: config, + tmux: tmux.NewTmux(), + logger: logger, + ctx: ctx, + cancel: cancel, + backoff: NewBackoffManager(DefaultBackoffConfig()), }, nil } @@ -124,6 +128,7 @@ func (d *Daemon) heartbeat(state *State) { // pokeMayor sends a heartbeat to the Mayor session. func (d *Daemon) pokeMayor() { const mayorSession = "gt-mayor" + const agentID = "mayor" running, err := d.tmux.HasSession(mayorSession) if err != nil { @@ -136,6 +141,22 @@ func (d *Daemon) pokeMayor() { return } + // Check keepalive to see if agent is active + state := keepalive.Read(d.config.TownRoot) + if state != nil && state.IsFresh() { + // Agent is actively working, reset backoff + d.backoff.RecordActivity(agentID) + d.logger.Printf("Mayor is fresh (cmd: %s), skipping poke", state.LastCommand) + return + } + + // Check if we should poke based on backoff interval + if !d.backoff.ShouldPoke(agentID) { + interval := d.backoff.GetInterval(agentID) + d.logger.Printf("Mayor backoff in effect (interval: %v), skipping poke", interval) + return + } + // Send heartbeat message via tmux msg := "HEARTBEAT: check your rigs" if err := d.tmux.SendKeys(mayorSession, msg); err != nil { @@ -143,7 +164,19 @@ func (d *Daemon) pokeMayor() { return } - d.logger.Println("Poked Mayor") + d.backoff.RecordPoke(agentID) + + // If agent is stale or very stale, record a miss (increase backoff) + if state == nil || state.IsVeryStale() { + d.backoff.RecordMiss(agentID) + interval := d.backoff.GetInterval(agentID) + d.logger.Printf("Poked Mayor (very stale, backoff now: %v)", interval) + } else if state.IsStale() { + // Stale but not very stale - don't increase backoff, but don't reset either + d.logger.Println("Poked Mayor (stale)") + } else { + d.logger.Println("Poked Mayor") + } } // pokeWitnesses sends heartbeats to all Witness sessions. @@ -162,16 +195,65 @@ func (d *Daemon) pokeWitnesses() { continue } - msg := "HEARTBEAT: check your workers" - if err := d.tmux.SendKeys(session, msg); err != nil { - d.logger.Printf("Error poking Witness %s: %v", session, err) - continue - } - - d.logger.Printf("Poked Witness: %s", session) + d.pokeWitness(session) } } +// pokeWitness sends a heartbeat to a single witness session with backoff. +func (d *Daemon) pokeWitness(session string) { + // Extract rig name from session (gt--witness -> ) + rigName := extractRigName(session) + agentID := session // Use session name as agent ID + + // Find the rig's workspace for keepalive check + rigWorkspace := filepath.Join(d.config.TownRoot, "gastown", rigName) + + // Check keepalive to see if the witness is active + state := keepalive.Read(rigWorkspace) + if state != nil && state.IsFresh() { + // Witness is actively working, reset backoff + d.backoff.RecordActivity(agentID) + d.logger.Printf("Witness %s is fresh (cmd: %s), skipping poke", session, state.LastCommand) + return + } + + // Check if we should poke based on backoff interval + if !d.backoff.ShouldPoke(agentID) { + interval := d.backoff.GetInterval(agentID) + d.logger.Printf("Witness %s backoff in effect (interval: %v), skipping poke", session, interval) + return + } + + // Send heartbeat message + msg := "HEARTBEAT: check your workers" + if err := d.tmux.SendKeys(session, msg); err != nil { + d.logger.Printf("Error poking Witness %s: %v", session, err) + return + } + + d.backoff.RecordPoke(agentID) + + // If agent is stale or very stale, record a miss (increase backoff) + if state == nil || state.IsVeryStale() { + d.backoff.RecordMiss(agentID) + interval := d.backoff.GetInterval(agentID) + d.logger.Printf("Poked Witness %s (very stale, backoff now: %v)", session, interval) + } else if state.IsStale() { + d.logger.Printf("Poked Witness %s (stale)", session) + } else { + d.logger.Printf("Poked Witness %s", session) + } +} + +// extractRigName extracts the rig name from a witness session name. +// "gt-gastown-witness" -> "gastown" +func extractRigName(session string) string { + // Remove "gt-" prefix and "-witness" suffix + name := strings.TrimPrefix(session, "gt-") + name = strings.TrimSuffix(name, "-witness") + return name +} + // isWitnessSession checks if a session name is a witness session. func isWitnessSession(name string) bool { // Pattern: gt--witness