package main import ( "encoding/json" "fmt" "os" "strings" "time" "github.com/spf13/cobra" "github.com/steveyegge/beads/internal/rpc" "github.com/steveyegge/beads/internal/storage/sqlite" "github.com/steveyegge/beads/internal/types" "github.com/steveyegge/beads/internal/ui" "github.com/steveyegge/beads/internal/utils" ) // Gate commands - async coordination primitives for agent workflows (bd-udsi) // // Gates are wisp issues that block until external conditions are met. // They enable agents to wait on: // - GitHub Actions run completion (gh:run:) // - Pull request merge/close (gh:pr:) // - Simple timer delay (timer:) // - Human approval (human:) // - Mail matching a pattern (mail:) // // Usage: // bd gate create --await gh:run:123 --timeout 30m --notify beads/dave // bd gate show // bd gate list // bd gate close --reason "completed" // bd gate wait --notify beads/alice var gateCmd = &cobra.Command{ Use: "gate", Short: "Gate commands (async coordination)", Long: `Manage gates - async coordination primitives for agent workflows. Gates are ephemeral (wisp) issues that block until external conditions are met. The orchestrator is responsible for monitoring and closing gates. Await types: gh:run: Wait for GitHub Actions run to complete gh:pr: Wait for pull request to be merged/closed timer: Simple delay (e.g., timer:30m, timer:1h) human: Wait for human approval mail: Wait for mail matching pattern Commands: create Create a new gate show Show gate details list List open gates close Close a gate wait Add a waiter to an existing gate`, } var gateCreateCmd = &cobra.Command{ Use: "create", Short: "Create a new gate", Long: `Create a new gate to wait on an external condition. The gate will be created as a wisp issue (ephemeral). The orchestrator is responsible for monitoring gates and closing them when conditions are met. Examples: bd gate create --await gh:run:123456789 --timeout 30m --notify beads/dave bd gate create --await gh:pr:42 --timeout 1h --notify beads/dave beads/alice bd gate create --await timer:15m --notify beads/dave bd gate create --await human:approve-deploy --timeout 4h --notify beads/dave`, Run: func(cmd *cobra.Command, args []string) { CheckReadonly("gate create") ctx := rootCtx awaitSpec, _ := cmd.Flags().GetString("await") timeoutStr, _ := cmd.Flags().GetString("timeout") notifyAddrs, _ := cmd.Flags().GetStringSlice("notify") title, _ := cmd.Flags().GetString("title") if awaitSpec == "" { fmt.Fprintf(os.Stderr, "Error: --await is required\n") os.Exit(1) } // Parse await spec into type and ID awaitType, awaitID := parseAwaitSpec(awaitSpec) if awaitType == "" { fmt.Fprintf(os.Stderr, "Error: invalid await spec %q, expected format: type:id\n", awaitSpec) fmt.Fprintf(os.Stderr, "Valid types: gh:run, gh:pr, timer, human, mail\n") os.Exit(1) } // Parse timeout var timeout time.Duration if timeoutStr != "" { var err error timeout, err = time.ParseDuration(timeoutStr) if err != nil { fmt.Fprintf(os.Stderr, "Error: invalid timeout %q: %v\n", timeoutStr, err) os.Exit(1) } } // Generate title if not provided if title == "" { title = fmt.Sprintf("Gate: %s:%s", awaitType, awaitID) } var gate *types.Issue // Try daemon first, fall back to direct store access if daemonClient != nil { resp, err := daemonClient.GateCreate(&rpc.GateCreateArgs{ Title: title, AwaitType: awaitType, AwaitID: awaitID, Timeout: timeout, Waiters: notifyAddrs, }) if err != nil { FatalError("gate create: %v", err) } // Parse the gate ID from response and fetch full gate var result rpc.GateCreateResult if err := json.Unmarshal(resp.Data, &result); err != nil { FatalError("failed to parse gate create result: %v", err) } // Get the full gate for output showResp, err := daemonClient.GateShow(&rpc.GateShowArgs{ID: result.ID}) if err != nil { FatalError("failed to fetch created gate: %v", err) } if err := json.Unmarshal(showResp.Data, &gate); err != nil { FatalError("failed to parse gate: %v", err) } } else if store != nil { now := time.Now() gate = &types.Issue{ // ID will be generated by CreateIssue Title: title, IssueType: types.TypeGate, Status: types.StatusOpen, Priority: 1, // Gates are typically high priority // Assignee left empty - orchestrator decides who processes gates Wisp: true, // Gates are wisps (ephemeral) AwaitType: awaitType, AwaitID: awaitID, Timeout: timeout, Waiters: notifyAddrs, CreatedAt: now, UpdatedAt: now, } gate.ContentHash = gate.ComputeContentHash() if err := store.CreateIssue(ctx, gate, actor); err != nil { fmt.Fprintf(os.Stderr, "Error creating gate: %v\n", err) os.Exit(1) } markDirtyAndScheduleFlush() } else { fmt.Fprintf(os.Stderr, "Error: no database connection\n") os.Exit(1) } if jsonOutput { outputJSON(gate) return } fmt.Printf("%s Created gate: %s\n", ui.RenderPass("✓"), gate.ID) fmt.Printf(" Await: %s:%s\n", awaitType, awaitID) if timeout > 0 { fmt.Printf(" Timeout: %v\n", timeout) } if len(notifyAddrs) > 0 { fmt.Printf(" Notify: %s\n", strings.Join(notifyAddrs, ", ")) } }, } // parseAwaitSpec parses an await specification like "gh:run:123" or "timer:30m" // Returns (awaitType, awaitID) or ("", "") if invalid func parseAwaitSpec(spec string) (string, string) { // Handle compound types like gh:run:123 and gh:pr:42 if strings.HasPrefix(spec, "gh:run:") { return "gh:run", strings.TrimPrefix(spec, "gh:run:") } if strings.HasPrefix(spec, "gh:pr:") { return "gh:pr", strings.TrimPrefix(spec, "gh:pr:") } // Handle simple types like timer:30m, human:approve, mail:pattern parts := strings.SplitN(spec, ":", 2) if len(parts) != 2 { return "", "" } awaitType := parts[0] awaitID := parts[1] // Validate type validTypes := map[string]bool{ "timer": true, "human": true, "mail": true, } if !validTypes[awaitType] { return "", "" } return awaitType, awaitID } var gateShowCmd = &cobra.Command{ Use: "show ", Short: "Show gate details", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { ctx := rootCtx var gate *types.Issue // Try daemon first, fall back to direct store access if daemonClient != nil { resp, err := daemonClient.GateShow(&rpc.GateShowArgs{ID: args[0]}) if err != nil { FatalError("gate show: %v", err) } if err := json.Unmarshal(resp.Data, &gate); err != nil { FatalError("failed to parse gate: %v", err) } } else if store != nil { gateID, err := utils.ResolvePartialID(ctx, store, args[0]) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } gate, err = store.GetIssue(ctx, gateID) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } if gate == nil { fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID) os.Exit(1) } if gate.IssueType != types.TypeGate { fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType) os.Exit(1) } } else { fmt.Fprintf(os.Stderr, "Error: no database connection\n") os.Exit(1) } if jsonOutput { outputJSON(gate) return } fmt.Printf("\n%s Gate: %s\n", ui.RenderAccent("🚧"), gate.ID) fmt.Printf(" Title: %s\n", gate.Title) fmt.Printf(" Status: %s\n", gate.Status) fmt.Printf(" Await: %s:%s\n", gate.AwaitType, gate.AwaitID) if gate.Timeout > 0 { elapsed := time.Since(gate.CreatedAt) remaining := gate.Timeout - elapsed if remaining < 0 { remaining = 0 } fmt.Printf(" Timeout: %v (remaining: %v)\n", gate.Timeout, remaining.Round(time.Second)) } if len(gate.Waiters) > 0 { fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", ")) } fmt.Printf(" Created: %s\n", gate.CreatedAt.Format("2006-01-02 15:04:05")) if gate.CloseReason != "" { fmt.Printf(" Close reason: %s\n", gate.CloseReason) } fmt.Println() }, } var gateListCmd = &cobra.Command{ Use: "list", Short: "List open gates", Run: func(cmd *cobra.Command, args []string) { ctx := rootCtx showAll, _ := cmd.Flags().GetBool("all") var issues []*types.Issue // Try daemon first, fall back to direct store access if daemonClient != nil { resp, err := daemonClient.GateList(&rpc.GateListArgs{All: showAll}) if err != nil { FatalError("gate list: %v", err) } if err := json.Unmarshal(resp.Data, &issues); err != nil { FatalError("failed to parse gates: %v", err) } } else if store != nil { // Build filter for gates gateType := types.TypeGate filter := types.IssueFilter{ IssueType: &gateType, } if !showAll { openStatus := types.StatusOpen filter.Status = &openStatus } var err error issues, err = store.SearchIssues(ctx, "", filter) if err != nil { fmt.Fprintf(os.Stderr, "Error listing gates: %v\n", err) os.Exit(1) } } else { fmt.Fprintf(os.Stderr, "Error: no database connection\n") os.Exit(1) } if jsonOutput { outputJSON(issues) return } if len(issues) == 0 { fmt.Println("No gates found") return } fmt.Printf("\n%s Gates (%d):\n\n", ui.RenderAccent("🚧"), len(issues)) for _, gate := range issues { statusIcon := "⏳" if gate.Status == types.StatusClosed { statusIcon = "✓" } awaitSpec := fmt.Sprintf("%s:%s", gate.AwaitType, gate.AwaitID) timeoutInfo := "" if gate.Timeout > 0 { elapsed := time.Since(gate.CreatedAt) remaining := gate.Timeout - elapsed if remaining < 0 { timeoutInfo = " (timed out)" } else { timeoutInfo = fmt.Sprintf(" (%v left)", remaining.Round(time.Second)) } } fmt.Printf(" %s %s: %s%s\n", statusIcon, gate.ID, awaitSpec, timeoutInfo) if len(gate.Waiters) > 0 { fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", ")) } } fmt.Println() }, } var gateCloseCmd = &cobra.Command{ Use: "close ", Short: "Close a gate", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { CheckReadonly("gate close") ctx := rootCtx reason, _ := cmd.Flags().GetString("reason") if reason == "" { reason = "Gate closed" } var closedGate *types.Issue var gateID string // Try daemon first, fall back to direct store access if daemonClient != nil { resp, err := daemonClient.GateClose(&rpc.GateCloseArgs{ ID: args[0], Reason: reason, }) if err != nil { FatalError("gate close: %v", err) } if err := json.Unmarshal(resp.Data, &closedGate); err != nil { FatalError("failed to parse gate: %v", err) } gateID = closedGate.ID } else if store != nil { var err error gateID, err = utils.ResolvePartialID(ctx, store, args[0]) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } // Verify it's a gate gate, err := store.GetIssue(ctx, gateID) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } if gate == nil { fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID) os.Exit(1) } if gate.IssueType != types.TypeGate { fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType) os.Exit(1) } if err := store.CloseIssue(ctx, gateID, reason, actor); err != nil { fmt.Fprintf(os.Stderr, "Error closing gate: %v\n", err) os.Exit(1) } markDirtyAndScheduleFlush() closedGate, _ = store.GetIssue(ctx, gateID) } else { fmt.Fprintf(os.Stderr, "Error: no database connection\n") os.Exit(1) } if jsonOutput { outputJSON(closedGate) return } fmt.Printf("%s Closed gate: %s\n", ui.RenderPass("✓"), gateID) fmt.Printf(" Reason: %s\n", reason) }, } var gateWaitCmd = &cobra.Command{ Use: "wait ", Short: "Add a waiter to an existing gate", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { CheckReadonly("gate wait") ctx := rootCtx notifyAddrs, _ := cmd.Flags().GetStringSlice("notify") if len(notifyAddrs) == 0 { fmt.Fprintf(os.Stderr, "Error: --notify is required\n") os.Exit(1) } var addedCount int var gateID string var newWaiters []string // Try daemon first, fall back to direct store access if daemonClient != nil { resp, err := daemonClient.GateWait(&rpc.GateWaitArgs{ ID: args[0], Waiters: notifyAddrs, }) if err != nil { FatalError("gate wait: %v", err) } var result rpc.GateWaitResult if err := json.Unmarshal(resp.Data, &result); err != nil { FatalError("failed to parse gate wait result: %v", err) } addedCount = result.AddedCount gateID = args[0] // Use the input ID for display // For daemon mode, we don't know exactly which waiters were added // Just report the count newWaiters = nil } else if store != nil { var err error gateID, err = utils.ResolvePartialID(ctx, store, args[0]) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } // Get existing gate gate, err := store.GetIssue(ctx, gateID) if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) os.Exit(1) } if gate == nil { fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID) os.Exit(1) } if gate.IssueType != types.TypeGate { fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType) os.Exit(1) } if gate.Status == types.StatusClosed { fmt.Fprintf(os.Stderr, "Error: gate %s is already closed\n", gateID) os.Exit(1) } // Add new waiters (avoiding duplicates) waiterSet := make(map[string]bool) for _, w := range gate.Waiters { waiterSet[w] = true } for _, addr := range notifyAddrs { if !waiterSet[addr] { newWaiters = append(newWaiters, addr) waiterSet[addr] = true } } addedCount = len(newWaiters) if addedCount == 0 { fmt.Println("All specified waiters are already registered on this gate") return } // Update waiters - need to use SQLite directly for Waiters field sqliteStore, ok := store.(*sqlite.SQLiteStorage) if !ok { fmt.Fprintf(os.Stderr, "Error: gate wait requires SQLite storage\n") os.Exit(1) } allWaiters := append(gate.Waiters, newWaiters...) waitersJSON, _ := json.Marshal(allWaiters) // Use raw SQL to update the waiters field _, err = sqliteStore.UnderlyingDB().ExecContext(ctx, `UPDATE issues SET waiters = ?, updated_at = ? WHERE id = ?`, string(waitersJSON), time.Now(), gateID) if err != nil { fmt.Fprintf(os.Stderr, "Error adding waiters: %v\n", err) os.Exit(1) } markDirtyAndScheduleFlush() if jsonOutput { updatedGate, _ := store.GetIssue(ctx, gateID) outputJSON(updatedGate) return } } else { fmt.Fprintf(os.Stderr, "Error: no database connection\n") os.Exit(1) } if addedCount == 0 { fmt.Println("All specified waiters are already registered on this gate") return } if jsonOutput { // For daemon mode, output the result outputJSON(map[string]interface{}{"added_count": addedCount, "gate_id": gateID}) return } fmt.Printf("%s Added %d waiter(s) to gate %s\n", ui.RenderPass("✓"), addedCount, gateID) for _, addr := range newWaiters { fmt.Printf(" + %s\n", addr) } }, } func init() { // Gate create flags gateCreateCmd.Flags().String("await", "", "Await spec: gh:run:, gh:pr:, timer:, human:, mail: (required)") gateCreateCmd.Flags().String("timeout", "", "Timeout duration (e.g., 30m, 1h)") gateCreateCmd.Flags().StringSlice("notify", nil, "Mail addresses to notify when gate clears (repeatable)") gateCreateCmd.Flags().String("title", "", "Custom title for the gate") gateCreateCmd.Flags().Bool("json", false, "Output JSON format") // Gate show flags gateShowCmd.Flags().Bool("json", false, "Output JSON format") // Gate list flags gateListCmd.Flags().Bool("all", false, "Show all gates including closed") gateListCmd.Flags().Bool("json", false, "Output JSON format") // Gate close flags gateCloseCmd.Flags().StringP("reason", "r", "", "Reason for closing") gateCloseCmd.Flags().Bool("json", false, "Output JSON format") // Gate wait flags gateWaitCmd.Flags().StringSlice("notify", nil, "Mail addresses to add as waiters (repeatable, required)") gateWaitCmd.Flags().Bool("json", false, "Output JSON format") // Add subcommands to gate command gateCmd.AddCommand(gateCreateCmd) gateCmd.AddCommand(gateShowCmd) gateCmd.AddCommand(gateListCmd) gateCmd.AddCommand(gateCloseCmd) gateCmd.AddCommand(gateWaitCmd) // Add gate command to root rootCmd.AddCommand(gateCmd) }