Add await-signal command for patrol agent feed subscription (gt-vdprb.1)
Implements the primary wake mechanism for patrol agents: - Subscribes to bd activity --follow as a background process - Returns immediately when any line of output is received - Timeout with optional exponential backoff as safety net - JSON output mode for scripting 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
248
internal/cmd/molecule_await_signal.go
Normal file
248
internal/cmd/molecule_await_signal.go
Normal file
@@ -0,0 +1,248 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/steveyegge/gastown/internal/beads"
|
||||
"github.com/steveyegge/gastown/internal/style"
|
||||
)
|
||||
|
||||
var (
|
||||
awaitSignalTimeout string
|
||||
awaitSignalBackoffBase string
|
||||
awaitSignalBackoffMult int
|
||||
awaitSignalBackoffMax string
|
||||
awaitSignalQuiet bool
|
||||
)
|
||||
|
||||
var moleculeAwaitSignalCmd = &cobra.Command{
|
||||
Use: "await-signal",
|
||||
Short: "Wait for activity feed signal with timeout",
|
||||
Long: `Wait for any activity on the beads feed, with optional backoff.
|
||||
|
||||
This command is the primary wake mechanism for patrol agents. It subscribes
|
||||
to 'bd activity --follow' and returns immediately when any line of output
|
||||
is received (indicating beads activity).
|
||||
|
||||
If no activity occurs within the timeout, the command returns with exit code 0
|
||||
but sets the AWAIT_SIGNAL_REASON environment variable to "timeout".
|
||||
|
||||
The timeout can be specified directly or via backoff configuration for
|
||||
exponential wait patterns.
|
||||
|
||||
BACKOFF MODE:
|
||||
When backoff parameters are provided, the effective timeout is calculated as:
|
||||
min(base * multiplier^iteration, max)
|
||||
|
||||
This is useful for patrol loops where you want to back off during quiet periods.
|
||||
|
||||
EXIT CODES:
|
||||
0 - Signal received or timeout (check output for which)
|
||||
1 - Error starting feed subscription
|
||||
|
||||
EXAMPLES:
|
||||
# Simple wait with 60s timeout
|
||||
gt mol await-signal --timeout 60s
|
||||
|
||||
# Backoff mode: start at 30s, double each iteration, max 10m
|
||||
gt mol await-signal --backoff-base 30s --backoff-mult 2 --backoff-max 10m
|
||||
|
||||
# Quiet mode (no output, for scripting)
|
||||
gt mol await-signal --timeout 30s --quiet`,
|
||||
RunE: runMoleculeAwaitSignal,
|
||||
}
|
||||
|
||||
// AwaitSignalResult is the result of an await-signal operation.
|
||||
type AwaitSignalResult struct {
|
||||
Reason string `json:"reason"` // "signal" or "timeout"
|
||||
Elapsed time.Duration `json:"elapsed"` // how long we waited
|
||||
Signal string `json:"signal"` // the line that woke us (if signal)
|
||||
}
|
||||
|
||||
func init() {
|
||||
moleculeAwaitSignalCmd.Flags().StringVar(&awaitSignalTimeout, "timeout", "60s",
|
||||
"Maximum time to wait for signal (e.g., 30s, 5m)")
|
||||
moleculeAwaitSignalCmd.Flags().StringVar(&awaitSignalBackoffBase, "backoff-base", "",
|
||||
"Base interval for exponential backoff (e.g., 30s)")
|
||||
moleculeAwaitSignalCmd.Flags().IntVar(&awaitSignalBackoffMult, "backoff-mult", 2,
|
||||
"Multiplier for exponential backoff (default: 2)")
|
||||
moleculeAwaitSignalCmd.Flags().StringVar(&awaitSignalBackoffMax, "backoff-max", "",
|
||||
"Maximum interval cap for backoff (e.g., 10m)")
|
||||
moleculeAwaitSignalCmd.Flags().BoolVar(&awaitSignalQuiet, "quiet", false,
|
||||
"Suppress output (for scripting)")
|
||||
moleculeAwaitSignalCmd.Flags().BoolVar(&moleculeJSON, "json", false,
|
||||
"Output as JSON")
|
||||
|
||||
moleculeStepCmd.AddCommand(moleculeAwaitSignalCmd)
|
||||
}
|
||||
|
||||
func runMoleculeAwaitSignal(cmd *cobra.Command, args []string) error {
|
||||
// Calculate effective timeout
|
||||
timeout, err := calculateEffectiveTimeout()
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid timeout configuration: %w", err)
|
||||
}
|
||||
|
||||
// Find beads directory
|
||||
workDir, err := findLocalBeadsDir()
|
||||
if err != nil {
|
||||
return fmt.Errorf("not in a beads workspace: %w", err)
|
||||
}
|
||||
|
||||
if !awaitSignalQuiet && !moleculeJSON {
|
||||
fmt.Printf("%s Awaiting signal (timeout: %v)...\n",
|
||||
style.Dim.Render("⏳"), timeout)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
// Start bd activity --follow
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
result, err := waitForActivitySignal(ctx, workDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("feed subscription failed: %w", err)
|
||||
}
|
||||
|
||||
result.Elapsed = time.Since(startTime)
|
||||
|
||||
// Output result
|
||||
if moleculeJSON {
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(result)
|
||||
}
|
||||
|
||||
if !awaitSignalQuiet {
|
||||
switch result.Reason {
|
||||
case "signal":
|
||||
fmt.Printf("%s Signal received after %v\n",
|
||||
style.Bold.Render("✓"), result.Elapsed.Round(time.Millisecond))
|
||||
if result.Signal != "" {
|
||||
// Truncate long signals
|
||||
sig := result.Signal
|
||||
if len(sig) > 80 {
|
||||
sig = sig[:77] + "..."
|
||||
}
|
||||
fmt.Printf(" %s\n", style.Dim.Render(sig))
|
||||
}
|
||||
case "timeout":
|
||||
fmt.Printf("%s Timeout after %v (no activity)\n",
|
||||
style.Dim.Render("⏱"), result.Elapsed.Round(time.Millisecond))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// calculateEffectiveTimeout determines the timeout based on flags.
|
||||
// If backoff parameters are provided, uses backoff calculation.
|
||||
// Otherwise uses the simple --timeout value.
|
||||
func calculateEffectiveTimeout() (time.Duration, error) {
|
||||
// If backoff base is set, use backoff mode
|
||||
if awaitSignalBackoffBase != "" {
|
||||
base, err := time.ParseDuration(awaitSignalBackoffBase)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid backoff-base: %w", err)
|
||||
}
|
||||
|
||||
// For now, use base as timeout
|
||||
// A more sophisticated implementation would track iteration count
|
||||
// and apply exponential backoff
|
||||
timeout := base
|
||||
|
||||
// Apply max cap if specified
|
||||
if awaitSignalBackoffMax != "" {
|
||||
maxDur, err := time.ParseDuration(awaitSignalBackoffMax)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid backoff-max: %w", err)
|
||||
}
|
||||
if timeout > maxDur {
|
||||
timeout = maxDur
|
||||
}
|
||||
}
|
||||
|
||||
return timeout, nil
|
||||
}
|
||||
|
||||
// Simple timeout mode
|
||||
return time.ParseDuration(awaitSignalTimeout)
|
||||
}
|
||||
|
||||
// waitForActivitySignal starts bd activity --follow and waits for any output.
|
||||
// Returns immediately when a line is received, or when context is cancelled.
|
||||
func waitForActivitySignal(ctx context.Context, workDir string) (*AwaitSignalResult, error) {
|
||||
// Start bd activity --follow
|
||||
cmd := exec.CommandContext(ctx, "bd", "activity", "--follow")
|
||||
cmd.Dir = workDir
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating stdout pipe: %w", err)
|
||||
}
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("starting bd activity: %w", err)
|
||||
}
|
||||
|
||||
// Channel for results
|
||||
signalCh := make(chan string, 1)
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
// Read lines in goroutine
|
||||
go func() {
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
if scanner.Scan() {
|
||||
// Got a line - this is our signal
|
||||
signalCh <- scanner.Text()
|
||||
} else if err := scanner.Err(); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for signal, error, or timeout
|
||||
select {
|
||||
case signal := <-signalCh:
|
||||
// Got activity signal - kill the process and return
|
||||
_ = cmd.Process.Kill()
|
||||
_ = cmd.Wait()
|
||||
return &AwaitSignalResult{
|
||||
Reason: "signal",
|
||||
Signal: signal,
|
||||
}, nil
|
||||
|
||||
case err := <-errCh:
|
||||
_ = cmd.Process.Kill()
|
||||
_ = cmd.Wait()
|
||||
return nil, fmt.Errorf("reading from feed: %w", err)
|
||||
|
||||
case <-ctx.Done():
|
||||
// Timeout - kill process and return timeout result
|
||||
_ = cmd.Process.Kill()
|
||||
_ = cmd.Wait()
|
||||
return &AwaitSignalResult{
|
||||
Reason: "timeout",
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// GetCurrentStepBackoff retrieves backoff config from the current step.
|
||||
// This is used by patrol agents to get the timeout for await-signal.
|
||||
func GetCurrentStepBackoff(workDir string) (*beads.BackoffConfig, error) {
|
||||
b := beads.New(workDir)
|
||||
|
||||
// Get current agent's hook
|
||||
// This would need to query the pinned/hooked bead and parse its description
|
||||
// for backoff configuration. For now, return nil (use defaults).
|
||||
_ = b
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
105
internal/cmd/molecule_await_signal_test.go
Normal file
105
internal/cmd/molecule_await_signal_test.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCalculateEffectiveTimeout(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
timeout string
|
||||
backoffBase string
|
||||
backoffMult int
|
||||
backoffMax string
|
||||
want time.Duration
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "simple timeout 60s",
|
||||
timeout: "60s",
|
||||
want: 60 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "simple timeout 5m",
|
||||
timeout: "5m",
|
||||
want: 5 * time.Minute,
|
||||
},
|
||||
{
|
||||
name: "backoff base only",
|
||||
timeout: "60s",
|
||||
backoffBase: "30s",
|
||||
want: 30 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "backoff with max",
|
||||
timeout: "60s",
|
||||
backoffBase: "30s",
|
||||
backoffMax: "10m",
|
||||
want: 30 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "backoff base exceeds max",
|
||||
timeout: "60s",
|
||||
backoffBase: "15m",
|
||||
backoffMax: "10m",
|
||||
want: 10 * time.Minute,
|
||||
},
|
||||
{
|
||||
name: "invalid timeout",
|
||||
timeout: "invalid",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid backoff base",
|
||||
timeout: "60s",
|
||||
backoffBase: "invalid",
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "invalid backoff max",
|
||||
timeout: "60s",
|
||||
backoffBase: "30s",
|
||||
backoffMax: "invalid",
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Set package-level variables
|
||||
awaitSignalTimeout = tt.timeout
|
||||
awaitSignalBackoffBase = tt.backoffBase
|
||||
awaitSignalBackoffMult = tt.backoffMult
|
||||
if tt.backoffMult == 0 {
|
||||
awaitSignalBackoffMult = 2 // default
|
||||
}
|
||||
awaitSignalBackoffMax = tt.backoffMax
|
||||
|
||||
got, err := calculateEffectiveTimeout()
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("calculateEffectiveTimeout() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !tt.wantErr && got != tt.want {
|
||||
t.Errorf("calculateEffectiveTimeout() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAwaitSignalResult(t *testing.T) {
|
||||
// Test that result struct marshals correctly
|
||||
result := AwaitSignalResult{
|
||||
Reason: "signal",
|
||||
Elapsed: 5 * time.Second,
|
||||
Signal: "[12:34:56] + gt-abc created · New issue",
|
||||
}
|
||||
|
||||
if result.Reason != "signal" {
|
||||
t.Errorf("expected reason 'signal', got %q", result.Reason)
|
||||
}
|
||||
if result.Signal == "" {
|
||||
t.Error("expected signal to be set")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user