feat(daemon): add adaptive backoff for heartbeat pokes

Implements per-agent backoff tracking to reduce noise for busy agents:
- AgentBackoff type tracks interval, miss count, and last activity
- BackoffManager manages state across all agents
- Geometric backoff strategy (1.5x factor, 10min cap)
- Integrates with keepalive to skip pokes when agents are fresh
- Resets backoff immediately when activity detected

Closes gt-8bx

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-19 12:09:26 -08:00
parent 557d0c6745
commit 3a477f673c
3 changed files with 577 additions and 18 deletions

187
internal/daemon/backoff.go Normal file
View File

@@ -0,0 +1,187 @@
package daemon
import (
"time"
)
// BackoffStrategy defines how intervals grow.
type BackoffStrategy string
const (
// StrategyFixed keeps the same interval (no backoff).
StrategyFixed BackoffStrategy = "fixed"
// StrategyGeometric multiplies by a factor each miss (1.5x).
StrategyGeometric BackoffStrategy = "geometric"
// StrategyExponential doubles interval each miss (2x).
StrategyExponential BackoffStrategy = "exponential"
)
// BackoffConfig holds backoff configuration.
type BackoffConfig struct {
// Strategy determines how intervals grow.
Strategy BackoffStrategy
// BaseInterval is the starting interval (default 60s).
BaseInterval time.Duration
// MaxInterval is the cap on how large intervals can grow (default 10m).
MaxInterval time.Duration
// Factor is the multiplier for geometric backoff (default 1.5).
Factor float64
}
// DefaultBackoffConfig returns sensible defaults.
func DefaultBackoffConfig() *BackoffConfig {
return &BackoffConfig{
Strategy: StrategyGeometric,
BaseInterval: 60 * time.Second,
MaxInterval: 10 * time.Minute,
Factor: 1.5,
}
}
// AgentBackoff tracks backoff state for a single agent.
type AgentBackoff struct {
// AgentID identifies the agent (e.g., "mayor", "gastown-witness").
AgentID string
// BaseInterval is the starting interval.
BaseInterval time.Duration
// CurrentInterval is the current (possibly backed-off) interval.
CurrentInterval time.Duration
// MaxInterval caps how large intervals can grow.
MaxInterval time.Duration
// ConsecutiveMiss counts pokes with no response.
ConsecutiveMiss int
// LastPoke is when we last poked this agent.
LastPoke time.Time
// LastActivity is when the agent last showed activity.
LastActivity time.Time
}
// NewAgentBackoff creates backoff state for an agent.
func NewAgentBackoff(agentID string, config *BackoffConfig) *AgentBackoff {
if config == nil {
config = DefaultBackoffConfig()
}
return &AgentBackoff{
AgentID: agentID,
BaseInterval: config.BaseInterval,
CurrentInterval: config.BaseInterval,
MaxInterval: config.MaxInterval,
}
}
// ShouldPoke returns true if enough time has passed since the last poke.
func (ab *AgentBackoff) ShouldPoke() bool {
if ab.LastPoke.IsZero() {
return true // Never poked
}
return time.Since(ab.LastPoke) >= ab.CurrentInterval
}
// RecordPoke records that we poked the agent.
func (ab *AgentBackoff) RecordPoke() {
ab.LastPoke = time.Now()
}
// RecordMiss records that the agent didn't respond since last poke.
// This increases the backoff interval.
func (ab *AgentBackoff) RecordMiss(config *BackoffConfig) {
ab.ConsecutiveMiss++
if config == nil {
config = DefaultBackoffConfig()
}
switch config.Strategy {
case StrategyFixed:
// No change
case StrategyGeometric:
ab.CurrentInterval = time.Duration(float64(ab.CurrentInterval) * config.Factor)
case StrategyExponential:
ab.CurrentInterval = ab.CurrentInterval * 2
}
// Cap at max interval
if ab.CurrentInterval > ab.MaxInterval {
ab.CurrentInterval = ab.MaxInterval
}
}
// RecordActivity records that the agent showed activity.
// This resets the backoff to the base interval.
func (ab *AgentBackoff) RecordActivity() {
ab.ConsecutiveMiss = 0
ab.CurrentInterval = ab.BaseInterval
ab.LastActivity = time.Now()
}
// BackoffManager tracks backoff state for all agents.
type BackoffManager struct {
config *BackoffConfig
agents map[string]*AgentBackoff
}
// NewBackoffManager creates a new backoff manager.
func NewBackoffManager(config *BackoffConfig) *BackoffManager {
if config == nil {
config = DefaultBackoffConfig()
}
return &BackoffManager{
config: config,
agents: make(map[string]*AgentBackoff),
}
}
// GetOrCreate returns backoff state for an agent, creating if needed.
func (bm *BackoffManager) GetOrCreate(agentID string) *AgentBackoff {
if ab, ok := bm.agents[agentID]; ok {
return ab
}
ab := NewAgentBackoff(agentID, bm.config)
bm.agents[agentID] = ab
return ab
}
// ShouldPoke returns true if we should poke the given agent.
func (bm *BackoffManager) ShouldPoke(agentID string) bool {
return bm.GetOrCreate(agentID).ShouldPoke()
}
// RecordPoke records that we poked an agent.
func (bm *BackoffManager) RecordPoke(agentID string) {
bm.GetOrCreate(agentID).RecordPoke()
}
// RecordMiss records that an agent didn't respond.
func (bm *BackoffManager) RecordMiss(agentID string) {
bm.GetOrCreate(agentID).RecordMiss(bm.config)
}
// RecordActivity records that an agent showed activity.
func (bm *BackoffManager) RecordActivity(agentID string) {
bm.GetOrCreate(agentID).RecordActivity()
}
// GetInterval returns the current interval for an agent.
func (bm *BackoffManager) GetInterval(agentID string) time.Duration {
return bm.GetOrCreate(agentID).CurrentInterval
}
// Stats returns a map of agent ID to current interval for logging.
func (bm *BackoffManager) Stats() map[string]time.Duration {
stats := make(map[string]time.Duration, len(bm.agents))
for id, ab := range bm.agents {
stats[id] = ab.CurrentInterval
}
return stats
}

View File

@@ -0,0 +1,290 @@
package daemon
import (
"testing"
"time"
)
func TestDefaultBackoffConfig(t *testing.T) {
config := DefaultBackoffConfig()
if config.Strategy != StrategyGeometric {
t.Errorf("expected strategy Geometric, got %v", config.Strategy)
}
if config.BaseInterval != 60*time.Second {
t.Errorf("expected base interval 60s, got %v", config.BaseInterval)
}
if config.MaxInterval != 10*time.Minute {
t.Errorf("expected max interval 10m, got %v", config.MaxInterval)
}
if config.Factor != 1.5 {
t.Errorf("expected factor 1.5, got %v", config.Factor)
}
}
func TestNewAgentBackoff(t *testing.T) {
config := DefaultBackoffConfig()
ab := NewAgentBackoff("test-agent", config)
if ab.AgentID != "test-agent" {
t.Errorf("expected agent ID 'test-agent', got %s", ab.AgentID)
}
if ab.BaseInterval != 60*time.Second {
t.Errorf("expected base interval 60s, got %v", ab.BaseInterval)
}
if ab.CurrentInterval != 60*time.Second {
t.Errorf("expected current interval 60s, got %v", ab.CurrentInterval)
}
if ab.ConsecutiveMiss != 0 {
t.Errorf("expected consecutive miss 0, got %d", ab.ConsecutiveMiss)
}
}
func TestAgentBackoff_ShouldPoke(t *testing.T) {
config := &BackoffConfig{
Strategy: StrategyGeometric,
BaseInterval: 100 * time.Millisecond, // Short for testing
MaxInterval: 1 * time.Second,
Factor: 1.5,
}
ab := NewAgentBackoff("test", config)
// Should poke immediately (never poked)
if !ab.ShouldPoke() {
t.Error("expected ShouldPoke=true for new agent")
}
// Record a poke
ab.RecordPoke()
// Should not poke immediately after
if ab.ShouldPoke() {
t.Error("expected ShouldPoke=false immediately after poke")
}
// Wait for interval
time.Sleep(110 * time.Millisecond)
// Now should poke again
if !ab.ShouldPoke() {
t.Error("expected ShouldPoke=true after interval elapsed")
}
}
func TestAgentBackoff_GeometricBackoff(t *testing.T) {
config := &BackoffConfig{
Strategy: StrategyGeometric,
BaseInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Second,
Factor: 1.5,
}
ab := NewAgentBackoff("test", config)
// Initial interval
if ab.CurrentInterval != 100*time.Millisecond {
t.Errorf("expected initial interval 100ms, got %v", ab.CurrentInterval)
}
// First miss: 100ms * 1.5 = 150ms
ab.RecordMiss(config)
if ab.CurrentInterval != 150*time.Millisecond {
t.Errorf("expected interval 150ms after 1 miss, got %v", ab.CurrentInterval)
}
if ab.ConsecutiveMiss != 1 {
t.Errorf("expected consecutive miss 1, got %d", ab.ConsecutiveMiss)
}
// Second miss: 150ms * 1.5 = 225ms
ab.RecordMiss(config)
if ab.CurrentInterval != 225*time.Millisecond {
t.Errorf("expected interval 225ms after 2 misses, got %v", ab.CurrentInterval)
}
// Third miss: 225ms * 1.5 = 337.5ms
ab.RecordMiss(config)
expected := time.Duration(337500000) // 337.5ms in nanoseconds
if ab.CurrentInterval != expected {
t.Errorf("expected interval ~337.5ms after 3 misses, got %v", ab.CurrentInterval)
}
}
func TestAgentBackoff_ExponentialBackoff(t *testing.T) {
config := &BackoffConfig{
Strategy: StrategyExponential,
BaseInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Second,
Factor: 2.0, // Ignored for exponential
}
ab := NewAgentBackoff("test", config)
// First miss: 100ms * 2 = 200ms
ab.RecordMiss(config)
if ab.CurrentInterval != 200*time.Millisecond {
t.Errorf("expected interval 200ms after 1 miss, got %v", ab.CurrentInterval)
}
// Second miss: 200ms * 2 = 400ms
ab.RecordMiss(config)
if ab.CurrentInterval != 400*time.Millisecond {
t.Errorf("expected interval 400ms after 2 misses, got %v", ab.CurrentInterval)
}
// Third miss: 400ms * 2 = 800ms
ab.RecordMiss(config)
if ab.CurrentInterval != 800*time.Millisecond {
t.Errorf("expected interval 800ms after 3 misses, got %v", ab.CurrentInterval)
}
}
func TestAgentBackoff_FixedStrategy(t *testing.T) {
config := &BackoffConfig{
Strategy: StrategyFixed,
BaseInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Second,
Factor: 1.5,
}
ab := NewAgentBackoff("test", config)
// Multiple misses should not change interval
ab.RecordMiss(config)
ab.RecordMiss(config)
ab.RecordMiss(config)
if ab.CurrentInterval != 100*time.Millisecond {
t.Errorf("expected interval to stay at 100ms with fixed strategy, got %v", ab.CurrentInterval)
}
if ab.ConsecutiveMiss != 3 {
t.Errorf("expected consecutive miss 3, got %d", ab.ConsecutiveMiss)
}
}
func TestAgentBackoff_MaxInterval(t *testing.T) {
config := &BackoffConfig{
Strategy: StrategyExponential,
BaseInterval: 100 * time.Millisecond,
MaxInterval: 500 * time.Millisecond,
Factor: 2.0,
}
ab := NewAgentBackoff("test", config)
// Keep missing until we hit the cap
for i := 0; i < 10; i++ {
ab.RecordMiss(config)
}
if ab.CurrentInterval != 500*time.Millisecond {
t.Errorf("expected interval capped at 500ms, got %v", ab.CurrentInterval)
}
}
func TestAgentBackoff_RecordActivity(t *testing.T) {
config := &BackoffConfig{
Strategy: StrategyGeometric,
BaseInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Second,
Factor: 1.5,
}
ab := NewAgentBackoff("test", config)
// Build up some backoff
ab.RecordMiss(config)
ab.RecordMiss(config)
ab.RecordMiss(config)
if ab.CurrentInterval == 100*time.Millisecond {
t.Error("expected interval to have increased")
}
if ab.ConsecutiveMiss != 3 {
t.Errorf("expected consecutive miss 3, got %d", ab.ConsecutiveMiss)
}
// Record activity - should reset
ab.RecordActivity()
if ab.CurrentInterval != 100*time.Millisecond {
t.Errorf("expected interval reset to 100ms, got %v", ab.CurrentInterval)
}
if ab.ConsecutiveMiss != 0 {
t.Errorf("expected consecutive miss reset to 0, got %d", ab.ConsecutiveMiss)
}
if ab.LastActivity.IsZero() {
t.Error("expected LastActivity to be set")
}
}
func TestBackoffManager_GetOrCreate(t *testing.T) {
bm := NewBackoffManager(DefaultBackoffConfig())
// First call creates
ab1 := bm.GetOrCreate("agent1")
if ab1 == nil {
t.Fatal("expected agent backoff to be created")
}
if ab1.AgentID != "agent1" {
t.Errorf("expected agent ID 'agent1', got %s", ab1.AgentID)
}
// Second call returns same instance
ab2 := bm.GetOrCreate("agent1")
if ab1 != ab2 {
t.Error("expected same instance on second call")
}
// Different agent creates new instance
ab3 := bm.GetOrCreate("agent2")
if ab1 == ab3 {
t.Error("expected different instance for different agent")
}
}
func TestBackoffManager_Stats(t *testing.T) {
config := &BackoffConfig{
Strategy: StrategyGeometric,
BaseInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Second,
Factor: 1.5,
}
bm := NewBackoffManager(config)
// Create some agents with different backoff states
bm.RecordPoke("agent1")
bm.RecordMiss("agent1")
bm.RecordPoke("agent2")
bm.RecordMiss("agent2")
bm.RecordMiss("agent2")
stats := bm.Stats()
if len(stats) != 2 {
t.Errorf("expected 2 agents in stats, got %d", len(stats))
}
// agent1: 100ms * 1.5 = 150ms
if stats["agent1"] != 150*time.Millisecond {
t.Errorf("expected agent1 interval 150ms, got %v", stats["agent1"])
}
// agent2: 100ms * 1.5 * 1.5 = 225ms
if stats["agent2"] != 225*time.Millisecond {
t.Errorf("expected agent2 interval 225ms, got %v", stats["agent2"])
}
}
func TestExtractRigName(t *testing.T) {
tests := []struct {
session string
expected string
}{
{"gt-gastown-witness", "gastown"},
{"gt-myrig-witness", "myrig"},
{"gt-my-rig-name-witness", "my-rig-name"},
}
for _, tc := range tests {
result := extractRigName(tc.session)
if result != tc.expected {
t.Errorf("extractRigName(%q) = %q, expected %q", tc.session, result, tc.expected)
}
}
}

View File

@@ -8,19 +8,22 @@ import (
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
"github.com/steveyegge/gastown/internal/keepalive"
"github.com/steveyegge/gastown/internal/tmux"
)
// Daemon is the town-level background service.
type Daemon struct {
config *Config
tmux *tmux.Tmux
logger *log.Logger
ctx context.Context
cancel context.CancelFunc
config *Config
tmux *tmux.Tmux
logger *log.Logger
ctx context.Context
cancel context.CancelFunc
backoff *BackoffManager
}
// New creates a new daemon instance.
@@ -41,11 +44,12 @@ func New(config *Config) (*Daemon, error) {
ctx, cancel := context.WithCancel(context.Background())
return &Daemon{
config: config,
tmux: tmux.NewTmux(),
logger: logger,
ctx: ctx,
cancel: cancel,
config: config,
tmux: tmux.NewTmux(),
logger: logger,
ctx: ctx,
cancel: cancel,
backoff: NewBackoffManager(DefaultBackoffConfig()),
}, nil
}
@@ -124,6 +128,7 @@ func (d *Daemon) heartbeat(state *State) {
// pokeMayor sends a heartbeat to the Mayor session.
func (d *Daemon) pokeMayor() {
const mayorSession = "gt-mayor"
const agentID = "mayor"
running, err := d.tmux.HasSession(mayorSession)
if err != nil {
@@ -136,6 +141,22 @@ func (d *Daemon) pokeMayor() {
return
}
// Check keepalive to see if agent is active
state := keepalive.Read(d.config.TownRoot)
if state != nil && state.IsFresh() {
// Agent is actively working, reset backoff
d.backoff.RecordActivity(agentID)
d.logger.Printf("Mayor is fresh (cmd: %s), skipping poke", state.LastCommand)
return
}
// Check if we should poke based on backoff interval
if !d.backoff.ShouldPoke(agentID) {
interval := d.backoff.GetInterval(agentID)
d.logger.Printf("Mayor backoff in effect (interval: %v), skipping poke", interval)
return
}
// Send heartbeat message via tmux
msg := "HEARTBEAT: check your rigs"
if err := d.tmux.SendKeys(mayorSession, msg); err != nil {
@@ -143,7 +164,19 @@ func (d *Daemon) pokeMayor() {
return
}
d.logger.Println("Poked Mayor")
d.backoff.RecordPoke(agentID)
// If agent is stale or very stale, record a miss (increase backoff)
if state == nil || state.IsVeryStale() {
d.backoff.RecordMiss(agentID)
interval := d.backoff.GetInterval(agentID)
d.logger.Printf("Poked Mayor (very stale, backoff now: %v)", interval)
} else if state.IsStale() {
// Stale but not very stale - don't increase backoff, but don't reset either
d.logger.Println("Poked Mayor (stale)")
} else {
d.logger.Println("Poked Mayor")
}
}
// pokeWitnesses sends heartbeats to all Witness sessions.
@@ -162,16 +195,65 @@ func (d *Daemon) pokeWitnesses() {
continue
}
msg := "HEARTBEAT: check your workers"
if err := d.tmux.SendKeys(session, msg); err != nil {
d.logger.Printf("Error poking Witness %s: %v", session, err)
continue
}
d.logger.Printf("Poked Witness: %s", session)
d.pokeWitness(session)
}
}
// pokeWitness sends a heartbeat to a single witness session with backoff.
func (d *Daemon) pokeWitness(session string) {
// Extract rig name from session (gt-<rig>-witness -> <rig>)
rigName := extractRigName(session)
agentID := session // Use session name as agent ID
// Find the rig's workspace for keepalive check
rigWorkspace := filepath.Join(d.config.TownRoot, "gastown", rigName)
// Check keepalive to see if the witness is active
state := keepalive.Read(rigWorkspace)
if state != nil && state.IsFresh() {
// Witness is actively working, reset backoff
d.backoff.RecordActivity(agentID)
d.logger.Printf("Witness %s is fresh (cmd: %s), skipping poke", session, state.LastCommand)
return
}
// Check if we should poke based on backoff interval
if !d.backoff.ShouldPoke(agentID) {
interval := d.backoff.GetInterval(agentID)
d.logger.Printf("Witness %s backoff in effect (interval: %v), skipping poke", session, interval)
return
}
// Send heartbeat message
msg := "HEARTBEAT: check your workers"
if err := d.tmux.SendKeys(session, msg); err != nil {
d.logger.Printf("Error poking Witness %s: %v", session, err)
return
}
d.backoff.RecordPoke(agentID)
// If agent is stale or very stale, record a miss (increase backoff)
if state == nil || state.IsVeryStale() {
d.backoff.RecordMiss(agentID)
interval := d.backoff.GetInterval(agentID)
d.logger.Printf("Poked Witness %s (very stale, backoff now: %v)", session, interval)
} else if state.IsStale() {
d.logger.Printf("Poked Witness %s (stale)", session)
} else {
d.logger.Printf("Poked Witness %s", session)
}
}
// extractRigName extracts the rig name from a witness session name.
// "gt-gastown-witness" -> "gastown"
func extractRigName(session string) string {
// Remove "gt-" prefix and "-witness" suffix
name := strings.TrimPrefix(session, "gt-")
name = strings.TrimSuffix(name, "-witness")
return name
}
// isWitnessSession checks if a session name is a witness session.
func isWitnessSession(name string) bool {
// Pattern: gt-<rig>-witness