Files
beads/cmd/bd/gate.go
2025-12-23 20:50:50 -08:00

600 lines
17 KiB
Go

package main
import (
"encoding/json"
"fmt"
"os"
"strings"
"time"
"github.com/spf13/cobra"
"github.com/steveyegge/beads/internal/rpc"
"github.com/steveyegge/beads/internal/storage/sqlite"
"github.com/steveyegge/beads/internal/types"
"github.com/steveyegge/beads/internal/ui"
"github.com/steveyegge/beads/internal/utils"
)
// Gate commands - async coordination primitives for agent workflows (bd-udsi)
//
// Gates are wisp issues that block until external conditions are met.
// They are managed by the Deacon patrol and enable agents to wait on:
// - GitHub Actions run completion (gh:run:<id>)
// - Pull request merge/close (gh:pr:<id>)
// - Simple timer delay (timer:<duration>)
// - Human approval (human:<prompt>)
// - Mail matching a pattern (mail:<pattern>)
//
// Usage:
// bd gate create --await gh:run:123 --timeout 30m --notify beads/dave
// bd gate show <id>
// bd gate list
// bd gate close <id> --reason "completed"
// bd gate wait <id> --notify beads/alice
var gateCmd = &cobra.Command{
Use: "gate",
Short: "Gate commands (async coordination)",
Long: `Manage gates - async coordination primitives for agent workflows.
Gates are ephemeral (wisp) issues that block until external conditions are met.
They are typically managed by the Deacon patrol.
Await types:
gh:run:<id> Wait for GitHub Actions run to complete
gh:pr:<id> Wait for pull request to be merged/closed
timer:<duration> Simple delay (e.g., timer:30m, timer:1h)
human:<prompt> Wait for human approval
mail:<pattern> Wait for mail matching pattern
Commands:
create Create a new gate
show Show gate details
list List open gates
close Close a gate
wait Add a waiter to an existing gate`,
}
var gateCreateCmd = &cobra.Command{
Use: "create",
Short: "Create a new gate",
Long: `Create a new gate to wait on an external condition.
The gate will be created as a wisp issue (ephemeral) and assigned to the
Deacon patrol for monitoring.
Examples:
bd gate create --await gh:run:123456789 --timeout 30m --notify beads/dave
bd gate create --await gh:pr:42 --timeout 1h --notify beads/dave beads/alice
bd gate create --await timer:15m --notify beads/dave
bd gate create --await human:approve-deploy --timeout 4h --notify beads/dave`,
Run: func(cmd *cobra.Command, args []string) {
CheckReadonly("gate create")
ctx := rootCtx
awaitSpec, _ := cmd.Flags().GetString("await")
timeoutStr, _ := cmd.Flags().GetString("timeout")
notifyAddrs, _ := cmd.Flags().GetStringSlice("notify")
title, _ := cmd.Flags().GetString("title")
if awaitSpec == "" {
fmt.Fprintf(os.Stderr, "Error: --await is required\n")
os.Exit(1)
}
// Parse await spec into type and ID
awaitType, awaitID := parseAwaitSpec(awaitSpec)
if awaitType == "" {
fmt.Fprintf(os.Stderr, "Error: invalid await spec %q, expected format: type:id\n", awaitSpec)
fmt.Fprintf(os.Stderr, "Valid types: gh:run, gh:pr, timer, human, mail\n")
os.Exit(1)
}
// Parse timeout
var timeout time.Duration
if timeoutStr != "" {
var err error
timeout, err = time.ParseDuration(timeoutStr)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: invalid timeout %q: %v\n", timeoutStr, err)
os.Exit(1)
}
}
// Generate title if not provided
if title == "" {
title = fmt.Sprintf("Gate: %s:%s", awaitType, awaitID)
}
var gate *types.Issue
// Try daemon first, fall back to direct store access
if daemonClient != nil {
resp, err := daemonClient.GateCreate(&rpc.GateCreateArgs{
Title: title,
AwaitType: awaitType,
AwaitID: awaitID,
Timeout: timeout,
Waiters: notifyAddrs,
})
if err != nil {
FatalError("gate create: %v", err)
}
// Parse the gate ID from response and fetch full gate
var result rpc.GateCreateResult
if err := json.Unmarshal(resp.Data, &result); err != nil {
FatalError("failed to parse gate create result: %v", err)
}
// Get the full gate for output
showResp, err := daemonClient.GateShow(&rpc.GateShowArgs{ID: result.ID})
if err != nil {
FatalError("failed to fetch created gate: %v", err)
}
if err := json.Unmarshal(showResp.Data, &gate); err != nil {
FatalError("failed to parse gate: %v", err)
}
} else if store != nil {
now := time.Now()
gate = &types.Issue{
// ID will be generated by CreateIssue
Title: title,
IssueType: types.TypeGate,
Status: types.StatusOpen,
Priority: 1, // Gates are typically high priority
Assignee: "deacon/",
Wisp: true, // Gates are wisps (ephemeral)
AwaitType: awaitType,
AwaitID: awaitID,
Timeout: timeout,
Waiters: notifyAddrs,
CreatedAt: now,
UpdatedAt: now,
}
gate.ContentHash = gate.ComputeContentHash()
if err := store.CreateIssue(ctx, gate, actor); err != nil {
fmt.Fprintf(os.Stderr, "Error creating gate: %v\n", err)
os.Exit(1)
}
markDirtyAndScheduleFlush()
} else {
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
os.Exit(1)
}
if jsonOutput {
outputJSON(gate)
return
}
fmt.Printf("%s Created gate: %s\n", ui.RenderPass("✓"), gate.ID)
fmt.Printf(" Await: %s:%s\n", awaitType, awaitID)
if timeout > 0 {
fmt.Printf(" Timeout: %v\n", timeout)
}
if len(notifyAddrs) > 0 {
fmt.Printf(" Notify: %s\n", strings.Join(notifyAddrs, ", "))
}
},
}
// parseAwaitSpec parses an await specification like "gh:run:123" or "timer:30m"
// Returns (awaitType, awaitID) or ("", "") if invalid
func parseAwaitSpec(spec string) (string, string) {
// Handle compound types like gh:run:123 and gh:pr:42
if strings.HasPrefix(spec, "gh:run:") {
return "gh:run", strings.TrimPrefix(spec, "gh:run:")
}
if strings.HasPrefix(spec, "gh:pr:") {
return "gh:pr", strings.TrimPrefix(spec, "gh:pr:")
}
// Handle simple types like timer:30m, human:approve, mail:pattern
parts := strings.SplitN(spec, ":", 2)
if len(parts) != 2 {
return "", ""
}
awaitType := parts[0]
awaitID := parts[1]
// Validate type
validTypes := map[string]bool{
"timer": true,
"human": true,
"mail": true,
}
if !validTypes[awaitType] {
return "", ""
}
return awaitType, awaitID
}
var gateShowCmd = &cobra.Command{
Use: "show <gate-id>",
Short: "Show gate details",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ctx := rootCtx
var gate *types.Issue
// Try daemon first, fall back to direct store access
if daemonClient != nil {
resp, err := daemonClient.GateShow(&rpc.GateShowArgs{ID: args[0]})
if err != nil {
FatalError("gate show: %v", err)
}
if err := json.Unmarshal(resp.Data, &gate); err != nil {
FatalError("failed to parse gate: %v", err)
}
} else if store != nil {
gateID, err := utils.ResolvePartialID(ctx, store, args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
gate, err = store.GetIssue(ctx, gateID)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
if gate == nil {
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
os.Exit(1)
}
if gate.IssueType != types.TypeGate {
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
os.Exit(1)
}
} else {
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
os.Exit(1)
}
if jsonOutput {
outputJSON(gate)
return
}
fmt.Printf("\n%s Gate: %s\n", ui.RenderAccent("🚧"), gate.ID)
fmt.Printf(" Title: %s\n", gate.Title)
fmt.Printf(" Status: %s\n", gate.Status)
fmt.Printf(" Await: %s:%s\n", gate.AwaitType, gate.AwaitID)
if gate.Timeout > 0 {
elapsed := time.Since(gate.CreatedAt)
remaining := gate.Timeout - elapsed
if remaining < 0 {
remaining = 0
}
fmt.Printf(" Timeout: %v (remaining: %v)\n", gate.Timeout, remaining.Round(time.Second))
}
if len(gate.Waiters) > 0 {
fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", "))
}
fmt.Printf(" Created: %s\n", gate.CreatedAt.Format("2006-01-02 15:04:05"))
if gate.CloseReason != "" {
fmt.Printf(" Close reason: %s\n", gate.CloseReason)
}
fmt.Println()
},
}
var gateListCmd = &cobra.Command{
Use: "list",
Short: "List open gates",
Run: func(cmd *cobra.Command, args []string) {
ctx := rootCtx
showAll, _ := cmd.Flags().GetBool("all")
var issues []*types.Issue
// Try daemon first, fall back to direct store access
if daemonClient != nil {
resp, err := daemonClient.GateList(&rpc.GateListArgs{All: showAll})
if err != nil {
FatalError("gate list: %v", err)
}
if err := json.Unmarshal(resp.Data, &issues); err != nil {
FatalError("failed to parse gates: %v", err)
}
} else if store != nil {
// Build filter for gates
gateType := types.TypeGate
filter := types.IssueFilter{
IssueType: &gateType,
}
if !showAll {
openStatus := types.StatusOpen
filter.Status = &openStatus
}
var err error
issues, err = store.SearchIssues(ctx, "", filter)
if err != nil {
fmt.Fprintf(os.Stderr, "Error listing gates: %v\n", err)
os.Exit(1)
}
} else {
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
os.Exit(1)
}
if jsonOutput {
outputJSON(issues)
return
}
if len(issues) == 0 {
fmt.Println("No gates found")
return
}
fmt.Printf("\n%s Gates (%d):\n\n", ui.RenderAccent("🚧"), len(issues))
for _, gate := range issues {
statusIcon := "⏳"
if gate.Status == types.StatusClosed {
statusIcon = "✓"
}
awaitSpec := fmt.Sprintf("%s:%s", gate.AwaitType, gate.AwaitID)
timeoutInfo := ""
if gate.Timeout > 0 {
elapsed := time.Since(gate.CreatedAt)
remaining := gate.Timeout - elapsed
if remaining < 0 {
timeoutInfo = " (timed out)"
} else {
timeoutInfo = fmt.Sprintf(" (%v left)", remaining.Round(time.Second))
}
}
fmt.Printf(" %s %s: %s%s\n", statusIcon, gate.ID, awaitSpec, timeoutInfo)
if len(gate.Waiters) > 0 {
fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", "))
}
}
fmt.Println()
},
}
var gateCloseCmd = &cobra.Command{
Use: "close <gate-id>",
Short: "Close a gate",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
CheckReadonly("gate close")
ctx := rootCtx
reason, _ := cmd.Flags().GetString("reason")
if reason == "" {
reason = "Gate closed"
}
var closedGate *types.Issue
var gateID string
// Try daemon first, fall back to direct store access
if daemonClient != nil {
resp, err := daemonClient.GateClose(&rpc.GateCloseArgs{
ID: args[0],
Reason: reason,
})
if err != nil {
FatalError("gate close: %v", err)
}
if err := json.Unmarshal(resp.Data, &closedGate); err != nil {
FatalError("failed to parse gate: %v", err)
}
gateID = closedGate.ID
} else if store != nil {
var err error
gateID, err = utils.ResolvePartialID(ctx, store, args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
// Verify it's a gate
gate, err := store.GetIssue(ctx, gateID)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
if gate == nil {
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
os.Exit(1)
}
if gate.IssueType != types.TypeGate {
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
os.Exit(1)
}
if err := store.CloseIssue(ctx, gateID, reason, actor); err != nil {
fmt.Fprintf(os.Stderr, "Error closing gate: %v\n", err)
os.Exit(1)
}
markDirtyAndScheduleFlush()
closedGate, _ = store.GetIssue(ctx, gateID)
} else {
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
os.Exit(1)
}
if jsonOutput {
outputJSON(closedGate)
return
}
fmt.Printf("%s Closed gate: %s\n", ui.RenderPass("✓"), gateID)
fmt.Printf(" Reason: %s\n", reason)
},
}
var gateWaitCmd = &cobra.Command{
Use: "wait <gate-id>",
Short: "Add a waiter to an existing gate",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
CheckReadonly("gate wait")
ctx := rootCtx
notifyAddrs, _ := cmd.Flags().GetStringSlice("notify")
if len(notifyAddrs) == 0 {
fmt.Fprintf(os.Stderr, "Error: --notify is required\n")
os.Exit(1)
}
var addedCount int
var gateID string
var newWaiters []string
// Try daemon first, fall back to direct store access
if daemonClient != nil {
resp, err := daemonClient.GateWait(&rpc.GateWaitArgs{
ID: args[0],
Waiters: notifyAddrs,
})
if err != nil {
FatalError("gate wait: %v", err)
}
var result rpc.GateWaitResult
if err := json.Unmarshal(resp.Data, &result); err != nil {
FatalError("failed to parse gate wait result: %v", err)
}
addedCount = result.AddedCount
gateID = args[0] // Use the input ID for display
// For daemon mode, we don't know exactly which waiters were added
// Just report the count
newWaiters = nil
} else if store != nil {
var err error
gateID, err = utils.ResolvePartialID(ctx, store, args[0])
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
// Get existing gate
gate, err := store.GetIssue(ctx, gateID)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
if gate == nil {
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
os.Exit(1)
}
if gate.IssueType != types.TypeGate {
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
os.Exit(1)
}
if gate.Status == types.StatusClosed {
fmt.Fprintf(os.Stderr, "Error: gate %s is already closed\n", gateID)
os.Exit(1)
}
// Add new waiters (avoiding duplicates)
waiterSet := make(map[string]bool)
for _, w := range gate.Waiters {
waiterSet[w] = true
}
for _, addr := range notifyAddrs {
if !waiterSet[addr] {
newWaiters = append(newWaiters, addr)
waiterSet[addr] = true
}
}
addedCount = len(newWaiters)
if addedCount == 0 {
fmt.Println("All specified waiters are already registered on this gate")
return
}
// Update waiters - need to use SQLite directly for Waiters field
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
if !ok {
fmt.Fprintf(os.Stderr, "Error: gate wait requires SQLite storage\n")
os.Exit(1)
}
allWaiters := append(gate.Waiters, newWaiters...)
waitersJSON, _ := json.Marshal(allWaiters)
// Use raw SQL to update the waiters field
_, err = sqliteStore.UnderlyingDB().ExecContext(ctx, `UPDATE issues SET waiters = ?, updated_at = ? WHERE id = ?`,
string(waitersJSON), time.Now(), gateID)
if err != nil {
fmt.Fprintf(os.Stderr, "Error adding waiters: %v\n", err)
os.Exit(1)
}
markDirtyAndScheduleFlush()
if jsonOutput {
updatedGate, _ := store.GetIssue(ctx, gateID)
outputJSON(updatedGate)
return
}
} else {
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
os.Exit(1)
}
if addedCount == 0 {
fmt.Println("All specified waiters are already registered on this gate")
return
}
if jsonOutput {
// For daemon mode, output the result
outputJSON(map[string]interface{}{"added_count": addedCount, "gate_id": gateID})
return
}
fmt.Printf("%s Added %d waiter(s) to gate %s\n", ui.RenderPass("✓"), addedCount, gateID)
for _, addr := range newWaiters {
fmt.Printf(" + %s\n", addr)
}
},
}
func init() {
// Gate create flags
gateCreateCmd.Flags().String("await", "", "Await spec: gh:run:<id>, gh:pr:<id>, timer:<duration>, human:<prompt>, mail:<pattern> (required)")
gateCreateCmd.Flags().String("timeout", "", "Timeout duration (e.g., 30m, 1h)")
gateCreateCmd.Flags().StringSlice("notify", nil, "Mail addresses to notify when gate clears (repeatable)")
gateCreateCmd.Flags().String("title", "", "Custom title for the gate")
gateCreateCmd.Flags().Bool("json", false, "Output JSON format")
// Gate show flags
gateShowCmd.Flags().Bool("json", false, "Output JSON format")
// Gate list flags
gateListCmd.Flags().Bool("all", false, "Show all gates including closed")
gateListCmd.Flags().Bool("json", false, "Output JSON format")
// Gate close flags
gateCloseCmd.Flags().StringP("reason", "r", "", "Reason for closing")
gateCloseCmd.Flags().Bool("json", false, "Output JSON format")
// Gate wait flags
gateWaitCmd.Flags().StringSlice("notify", nil, "Mail addresses to add as waiters (repeatable, required)")
gateWaitCmd.Flags().Bool("json", false, "Output JSON format")
// Add subcommands to gate command
gateCmd.AddCommand(gateCreateCmd)
gateCmd.AddCommand(gateShowCmd)
gateCmd.AddCommand(gateListCmd)
gateCmd.AddCommand(gateCloseCmd)
gateCmd.AddCommand(gateWaitCmd)
// Add gate command to root
rootCmd.AddCommand(gateCmd)
}