Add async gates - coordination primitives for agents to wait on external events like CI completion, PR merges, timers, or human approval. Changes: - Add 'gate' issue type to types.go with gate-specific fields: - AwaitType: condition type (gh:run, gh:pr, timer, human, mail) - AwaitID: condition identifier - Timeout: max wait duration - Waiters: mail addresses to notify when gate clears - Add SQLite migration 027_gate_columns for new fields - Update all SQLite storage queries to handle gate fields - Add bd gate commands: create, show, list, close, wait - All commands support --json output and --no-daemon mode Closes: bd-2v0f, bd-lz49, bd-u66e 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
525 lines
15 KiB
Go
525 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
"github.com/steveyegge/beads/internal/storage/sqlite"
|
|
"github.com/steveyegge/beads/internal/types"
|
|
"github.com/steveyegge/beads/internal/ui"
|
|
"github.com/steveyegge/beads/internal/utils"
|
|
)
|
|
|
|
// Gate commands - async coordination primitives for agent workflows (bd-udsi)
|
|
//
|
|
// Gates are wisp issues that block until external conditions are met.
|
|
// They are managed by the Deacon patrol and enable agents to wait on:
|
|
// - GitHub Actions run completion (gh:run:<id>)
|
|
// - Pull request merge/close (gh:pr:<id>)
|
|
// - Simple timer delay (timer:<duration>)
|
|
// - Human approval (human:<prompt>)
|
|
// - Mail matching a pattern (mail:<pattern>)
|
|
//
|
|
// Usage:
|
|
// bd gate create --await gh:run:123 --timeout 30m --notify beads/dave
|
|
// bd gate show <id>
|
|
// bd gate list
|
|
// bd gate close <id> --reason "completed"
|
|
// bd gate wait <id> --notify beads/alice
|
|
|
|
var gateCmd = &cobra.Command{
|
|
Use: "gate",
|
|
Short: "Gate commands (async coordination)",
|
|
Long: `Manage gates - async coordination primitives for agent workflows.
|
|
|
|
Gates are ephemeral (wisp) issues that block until external conditions are met.
|
|
They are typically managed by the Deacon patrol.
|
|
|
|
Await types:
|
|
gh:run:<id> Wait for GitHub Actions run to complete
|
|
gh:pr:<id> Wait for pull request to be merged/closed
|
|
timer:<duration> Simple delay (e.g., timer:30m, timer:1h)
|
|
human:<prompt> Wait for human approval
|
|
mail:<pattern> Wait for mail matching pattern
|
|
|
|
Commands:
|
|
create Create a new gate
|
|
show Show gate details
|
|
list List open gates
|
|
close Close a gate
|
|
wait Add a waiter to an existing gate`,
|
|
}
|
|
|
|
var gateCreateCmd = &cobra.Command{
|
|
Use: "create",
|
|
Short: "Create a new gate",
|
|
Long: `Create a new gate to wait on an external condition.
|
|
|
|
The gate will be created as a wisp issue (ephemeral) and assigned to the
|
|
Deacon patrol for monitoring.
|
|
|
|
Examples:
|
|
bd gate create --await gh:run:123456789 --timeout 30m --notify beads/dave
|
|
bd gate create --await gh:pr:42 --timeout 1h --notify beads/dave beads/alice
|
|
bd gate create --await timer:15m --notify beads/dave
|
|
bd gate create --await human:approve-deploy --timeout 4h --notify beads/dave`,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate create")
|
|
ctx := rootCtx
|
|
|
|
awaitSpec, _ := cmd.Flags().GetString("await")
|
|
timeoutStr, _ := cmd.Flags().GetString("timeout")
|
|
notifyAddrs, _ := cmd.Flags().GetStringSlice("notify")
|
|
title, _ := cmd.Flags().GetString("title")
|
|
|
|
if awaitSpec == "" {
|
|
fmt.Fprintf(os.Stderr, "Error: --await is required\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Parse await spec into type and ID
|
|
awaitType, awaitID := parseAwaitSpec(awaitSpec)
|
|
if awaitType == "" {
|
|
fmt.Fprintf(os.Stderr, "Error: invalid await spec %q, expected format: type:id\n", awaitSpec)
|
|
fmt.Fprintf(os.Stderr, "Valid types: gh:run, gh:pr, timer, human, mail\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Parse timeout
|
|
var timeout time.Duration
|
|
if timeoutStr != "" {
|
|
var err error
|
|
timeout, err = time.ParseDuration(timeoutStr)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: invalid timeout %q: %v\n", timeoutStr, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
// Generate title if not provided
|
|
if title == "" {
|
|
title = fmt.Sprintf("Gate: %s:%s", awaitType, awaitID)
|
|
}
|
|
|
|
// Gate creation requires direct store access
|
|
if store == nil {
|
|
if daemonClient != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate create requires direct database access\n")
|
|
fmt.Fprintf(os.Stderr, "Hint: use --no-daemon flag: bd --no-daemon gate create ...\n")
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
}
|
|
os.Exit(1)
|
|
}
|
|
|
|
now := time.Now()
|
|
gate := &types.Issue{
|
|
// ID will be generated by CreateIssue
|
|
Title: title,
|
|
IssueType: types.TypeGate,
|
|
Status: types.StatusOpen,
|
|
Priority: 1, // Gates are typically high priority
|
|
Assignee: "deacon/",
|
|
Wisp: true, // Gates are wisps (ephemeral)
|
|
AwaitType: awaitType,
|
|
AwaitID: awaitID,
|
|
Timeout: timeout,
|
|
Waiters: notifyAddrs,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
gate.ContentHash = gate.ComputeContentHash()
|
|
|
|
if err := store.CreateIssue(ctx, gate, actor); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error creating gate: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
markDirtyAndScheduleFlush()
|
|
|
|
if jsonOutput {
|
|
outputJSON(gate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("%s Created gate: %s\n", ui.RenderPass("✓"), gate.ID)
|
|
fmt.Printf(" Await: %s:%s\n", awaitType, awaitID)
|
|
if timeout > 0 {
|
|
fmt.Printf(" Timeout: %v\n", timeout)
|
|
}
|
|
if len(notifyAddrs) > 0 {
|
|
fmt.Printf(" Notify: %s\n", strings.Join(notifyAddrs, ", "))
|
|
}
|
|
},
|
|
}
|
|
|
|
// parseAwaitSpec parses an await specification like "gh:run:123" or "timer:30m"
|
|
// Returns (awaitType, awaitID) or ("", "") if invalid
|
|
func parseAwaitSpec(spec string) (string, string) {
|
|
// Handle compound types like gh:run:123 and gh:pr:42
|
|
if strings.HasPrefix(spec, "gh:run:") {
|
|
return "gh:run", strings.TrimPrefix(spec, "gh:run:")
|
|
}
|
|
if strings.HasPrefix(spec, "gh:pr:") {
|
|
return "gh:pr", strings.TrimPrefix(spec, "gh:pr:")
|
|
}
|
|
|
|
// Handle simple types like timer:30m, human:approve, mail:pattern
|
|
parts := strings.SplitN(spec, ":", 2)
|
|
if len(parts) != 2 {
|
|
return "", ""
|
|
}
|
|
|
|
awaitType := parts[0]
|
|
awaitID := parts[1]
|
|
|
|
// Validate type
|
|
validTypes := map[string]bool{
|
|
"timer": true,
|
|
"human": true,
|
|
"mail": true,
|
|
}
|
|
if !validTypes[awaitType] {
|
|
return "", ""
|
|
}
|
|
|
|
return awaitType, awaitID
|
|
}
|
|
|
|
var gateShowCmd = &cobra.Command{
|
|
Use: "show <gate-id>",
|
|
Short: "Show gate details",
|
|
Args: cobra.ExactArgs(1),
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
ctx := rootCtx
|
|
|
|
// Gate show requires direct store access
|
|
if store == nil {
|
|
if daemonClient != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate show requires direct database access\n")
|
|
fmt.Fprintf(os.Stderr, "Hint: use --no-daemon flag: bd --no-daemon gate show %s\n", args[0])
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
}
|
|
os.Exit(1)
|
|
}
|
|
|
|
gateID, err := utils.ResolvePartialID(ctx, store, args[0])
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
gate, err := store.GetIssue(ctx, gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if gate == nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.IssueType != types.TypeGate {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(gate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("\n%s Gate: %s\n", ui.RenderAccent("🚧"), gate.ID)
|
|
fmt.Printf(" Title: %s\n", gate.Title)
|
|
fmt.Printf(" Status: %s\n", gate.Status)
|
|
fmt.Printf(" Await: %s:%s\n", gate.AwaitType, gate.AwaitID)
|
|
if gate.Timeout > 0 {
|
|
elapsed := time.Since(gate.CreatedAt)
|
|
remaining := gate.Timeout - elapsed
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
fmt.Printf(" Timeout: %v (remaining: %v)\n", gate.Timeout, remaining.Round(time.Second))
|
|
}
|
|
if len(gate.Waiters) > 0 {
|
|
fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", "))
|
|
}
|
|
fmt.Printf(" Created: %s\n", gate.CreatedAt.Format("2006-01-02 15:04:05"))
|
|
if gate.CloseReason != "" {
|
|
fmt.Printf(" Close reason: %s\n", gate.CloseReason)
|
|
}
|
|
fmt.Println()
|
|
},
|
|
}
|
|
|
|
var gateListCmd = &cobra.Command{
|
|
Use: "list",
|
|
Short: "List open gates",
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
ctx := rootCtx
|
|
showAll, _ := cmd.Flags().GetBool("all")
|
|
|
|
// Gate list requires direct store access
|
|
if store == nil {
|
|
if daemonClient != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate list requires direct database access\n")
|
|
fmt.Fprintf(os.Stderr, "Hint: use --no-daemon flag: bd --no-daemon gate list\n")
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
}
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Build filter for gates
|
|
gateType := types.TypeGate
|
|
filter := types.IssueFilter{
|
|
IssueType: &gateType,
|
|
}
|
|
if !showAll {
|
|
openStatus := types.StatusOpen
|
|
filter.Status = &openStatus
|
|
}
|
|
|
|
issues, err := store.SearchIssues(ctx, "", filter)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error listing gates: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(issues)
|
|
return
|
|
}
|
|
|
|
if len(issues) == 0 {
|
|
fmt.Println("No gates found")
|
|
return
|
|
}
|
|
|
|
fmt.Printf("\n%s Gates (%d):\n\n", ui.RenderAccent("🚧"), len(issues))
|
|
for _, gate := range issues {
|
|
statusIcon := "⏳"
|
|
if gate.Status == types.StatusClosed {
|
|
statusIcon = "✓"
|
|
}
|
|
awaitSpec := fmt.Sprintf("%s:%s", gate.AwaitType, gate.AwaitID)
|
|
timeoutInfo := ""
|
|
if gate.Timeout > 0 {
|
|
elapsed := time.Since(gate.CreatedAt)
|
|
remaining := gate.Timeout - elapsed
|
|
if remaining < 0 {
|
|
timeoutInfo = " (timed out)"
|
|
} else {
|
|
timeoutInfo = fmt.Sprintf(" (%v left)", remaining.Round(time.Second))
|
|
}
|
|
}
|
|
fmt.Printf(" %s %s: %s%s\n", statusIcon, gate.ID, awaitSpec, timeoutInfo)
|
|
if len(gate.Waiters) > 0 {
|
|
fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", "))
|
|
}
|
|
}
|
|
fmt.Println()
|
|
},
|
|
}
|
|
|
|
var gateCloseCmd = &cobra.Command{
|
|
Use: "close <gate-id>",
|
|
Short: "Close a gate",
|
|
Args: cobra.ExactArgs(1),
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate close")
|
|
ctx := rootCtx
|
|
reason, _ := cmd.Flags().GetString("reason")
|
|
if reason == "" {
|
|
reason = "Gate closed"
|
|
}
|
|
|
|
// Gate close requires direct store access
|
|
if store == nil {
|
|
if daemonClient != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate close requires direct database access\n")
|
|
fmt.Fprintf(os.Stderr, "Hint: use --no-daemon flag: bd --no-daemon gate close %s\n", args[0])
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
}
|
|
os.Exit(1)
|
|
}
|
|
|
|
gateID, err := utils.ResolvePartialID(ctx, store, args[0])
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Verify it's a gate
|
|
gate, err := store.GetIssue(ctx, gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if gate == nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.IssueType != types.TypeGate {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if err := store.CloseIssue(ctx, gateID, reason, actor); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error closing gate: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
markDirtyAndScheduleFlush()
|
|
|
|
if jsonOutput {
|
|
closedGate, _ := store.GetIssue(ctx, gateID)
|
|
outputJSON(closedGate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("%s Closed gate: %s\n", ui.RenderPass("✓"), gateID)
|
|
fmt.Printf(" Reason: %s\n", reason)
|
|
},
|
|
}
|
|
|
|
var gateWaitCmd = &cobra.Command{
|
|
Use: "wait <gate-id>",
|
|
Short: "Add a waiter to an existing gate",
|
|
Args: cobra.ExactArgs(1),
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate wait")
|
|
ctx := rootCtx
|
|
notifyAddrs, _ := cmd.Flags().GetStringSlice("notify")
|
|
|
|
if len(notifyAddrs) == 0 {
|
|
fmt.Fprintf(os.Stderr, "Error: --notify is required\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Gate wait requires direct store access for now
|
|
if store == nil {
|
|
if daemonClient != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate wait requires direct database access\n")
|
|
fmt.Fprintf(os.Stderr, "Hint: use --no-daemon flag: bd --no-daemon gate wait %s --notify ...\n", args[0])
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
}
|
|
os.Exit(1)
|
|
}
|
|
|
|
gateID, err := utils.ResolvePartialID(ctx, store, args[0])
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Get existing gate
|
|
gate, err := store.GetIssue(ctx, gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if gate == nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.IssueType != types.TypeGate {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
|
|
os.Exit(1)
|
|
}
|
|
if gate.Status == types.StatusClosed {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s is already closed\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Add new waiters (avoiding duplicates)
|
|
waiterSet := make(map[string]bool)
|
|
for _, w := range gate.Waiters {
|
|
waiterSet[w] = true
|
|
}
|
|
newWaiters := []string{}
|
|
for _, addr := range notifyAddrs {
|
|
if !waiterSet[addr] {
|
|
newWaiters = append(newWaiters, addr)
|
|
waiterSet[addr] = true
|
|
}
|
|
}
|
|
|
|
if len(newWaiters) == 0 {
|
|
fmt.Println("All specified waiters are already registered on this gate")
|
|
return
|
|
}
|
|
|
|
// Update waiters - need to use SQLite directly for Waiters field
|
|
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
|
|
if !ok {
|
|
fmt.Fprintf(os.Stderr, "Error: gate wait requires SQLite storage\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
allWaiters := append(gate.Waiters, newWaiters...)
|
|
waitersJSON, _ := json.Marshal(allWaiters)
|
|
|
|
// Use raw SQL to update the waiters field
|
|
_, err = sqliteStore.UnderlyingDB().ExecContext(ctx, `UPDATE issues SET waiters = ?, updated_at = ? WHERE id = ?`,
|
|
string(waitersJSON), time.Now(), gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error adding waiters: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
markDirtyAndScheduleFlush()
|
|
|
|
if jsonOutput {
|
|
updatedGate, _ := store.GetIssue(ctx, gateID)
|
|
outputJSON(updatedGate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("%s Added waiter(s) to gate %s:\n", ui.RenderPass("✓"), gateID)
|
|
for _, addr := range newWaiters {
|
|
fmt.Printf(" + %s\n", addr)
|
|
}
|
|
},
|
|
}
|
|
|
|
func init() {
|
|
// Gate create flags
|
|
gateCreateCmd.Flags().String("await", "", "Await spec: gh:run:<id>, gh:pr:<id>, timer:<duration>, human:<prompt>, mail:<pattern> (required)")
|
|
gateCreateCmd.Flags().String("timeout", "", "Timeout duration (e.g., 30m, 1h)")
|
|
gateCreateCmd.Flags().StringSlice("notify", nil, "Mail addresses to notify when gate clears (repeatable)")
|
|
gateCreateCmd.Flags().String("title", "", "Custom title for the gate")
|
|
gateCreateCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate show flags
|
|
gateShowCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate list flags
|
|
gateListCmd.Flags().Bool("all", false, "Show all gates including closed")
|
|
gateListCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate close flags
|
|
gateCloseCmd.Flags().StringP("reason", "r", "", "Reason for closing")
|
|
gateCloseCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate wait flags
|
|
gateWaitCmd.Flags().StringSlice("notify", nil, "Mail addresses to add as waiters (repeatable, required)")
|
|
gateWaitCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Add subcommands to gate command
|
|
gateCmd.AddCommand(gateCreateCmd)
|
|
gateCmd.AddCommand(gateShowCmd)
|
|
gateCmd.AddCommand(gateListCmd)
|
|
gateCmd.AddCommand(gateCloseCmd)
|
|
gateCmd.AddCommand(gateWaitCmd)
|
|
|
|
// Add gate command to root
|
|
rootCmd.AddCommand(gateCmd)
|
|
}
|