Files
beads/cmd/bd/gate.go
onyx 3d93166b3f feat(gate): add cross-rig bead gate support (Phase 4)
Add bead gate type for cross-rig bead dependencies:
- await_type=bead with await_id=<rig>:<bead-id> format
- Add `bd gate check` command to evaluate open gates
- Support --type=bead to check only bead gates
- Support --dry-run to preview without closing
- Gate resolves when target bead is closed

Closes bd-w3rh

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-02 13:11:36 -08:00

711 lines
20 KiB
Go

package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"time"
_ "github.com/ncruces/go-sqlite3/driver"
_ "github.com/ncruces/go-sqlite3/embed"
"github.com/spf13/cobra"
"github.com/steveyegge/beads/internal/beads"
"github.com/steveyegge/beads/internal/configfile"
"github.com/steveyegge/beads/internal/routing"
"github.com/steveyegge/beads/internal/rpc"
"github.com/steveyegge/beads/internal/types"
"github.com/steveyegge/beads/internal/ui"
)
// gateCmd is the parent command for gate operations
var gateCmd = &cobra.Command{
Use: "gate",
GroupID: "issues",
Short: "Manage async coordination gates",
Long: `Gates are async wait conditions that block workflow steps.
Gates are created automatically when a formula step has a gate field.
They must be closed (manually or via watchers) for the blocked step to proceed.
Gate types:
human - Requires manual bd close (Phase 1)
timer - Expires after timeout (Phase 2)
gh:run - Waits for GitHub workflow (Phase 3)
gh:pr - Waits for PR merge (Phase 3)
bead - Waits for cross-rig bead to close (Phase 4)
For bead gates, await_id format is <rig>:<bead-id> (e.g., "gastown:gt-abc123").
Examples:
bd gate list # Show all open gates
bd gate list --all # Show all gates including closed
bd gate check # Evaluate all open gates
bd gate check --type=bead # Evaluate only bead gates
bd gate resolve <id> # Close a gate manually`,
}
// gateListCmd lists gate issues
var gateListCmd = &cobra.Command{
Use: "list",
Short: "List gate issues",
Long: `List all gate issues in the current beads database.
By default, shows only open gates. Use --all to include closed gates.`,
Run: func(cmd *cobra.Command, args []string) {
allFlag, _ := cmd.Flags().GetBool("all")
limit, _ := cmd.Flags().GetInt("limit")
// Build filter for gate type issues
gateType := types.TypeGate
filter := types.IssueFilter{
IssueType: &gateType,
Limit: limit,
}
// By default, exclude closed gates
if !allFlag {
filter.ExcludeStatus = []types.Status{types.StatusClosed}
}
ctx := rootCtx
// If daemon is running, use RPC
if daemonClient != nil {
listArgs := &rpc.ListArgs{
IssueType: "gate",
Limit: limit,
}
if !allFlag {
listArgs.ExcludeStatus = []string{"closed"}
}
resp, err := daemonClient.List(listArgs)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
var issues []*types.Issue
if err := json.Unmarshal(resp.Data, &issues); err != nil {
fmt.Fprintf(os.Stderr, "Error parsing response: %v\n", err)
os.Exit(1)
}
if jsonOutput {
outputJSON(issues)
return
}
displayGates(issues)
return
}
// Direct mode
issues, err := store.SearchIssues(ctx, "", filter)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
if jsonOutput {
outputJSON(issues)
return
}
displayGates(issues)
},
}
// displayGates formats and displays gate issues
func displayGates(gates []*types.Issue) {
if len(gates) == 0 {
fmt.Println("No gates found.")
return
}
fmt.Printf("\n%s Open Gates (%d):\n\n", ui.RenderAccent("⏳"), len(gates))
for _, gate := range gates {
statusSym := "○"
if gate.Status == types.StatusClosed {
statusSym = "●"
}
// Format gate info
gateInfo := gate.AwaitType
if gate.AwaitID != "" {
gateInfo = fmt.Sprintf("%s %s", gate.AwaitType, gate.AwaitID)
}
// Format timeout if present
timeoutStr := ""
if gate.Timeout > 0 {
timeoutStr = fmt.Sprintf(" (timeout: %s)", gate.Timeout)
}
// Find blocked step from ID (gate ID format: parent.gate-stepid)
blockedStep := ""
if strings.Contains(gate.ID, ".gate-") {
parts := strings.Split(gate.ID, ".gate-")
if len(parts) == 2 {
blockedStep = fmt.Sprintf("%s.%s", parts[0], parts[1])
}
}
fmt.Printf("%s %s - %s%s\n", statusSym, ui.RenderID(gate.ID), gateInfo, timeoutStr)
if blockedStep != "" {
fmt.Printf(" Blocks: %s\n", blockedStep)
}
fmt.Println()
}
fmt.Printf("To resolve a gate: bd close <gate-id>\n")
}
// gateResolveCmd manually closes a gate
var gateResolveCmd = &cobra.Command{
Use: "resolve <gate-id>",
Short: "Manually resolve (close) a gate",
Long: `Close a gate issue to unblock the step waiting on it.
This is equivalent to 'bd close <gate-id>' but with a more explicit name.
Use --reason to provide context for why the gate was resolved.`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
CheckReadonly("gate resolve")
gateID := args[0]
reason, _ := cmd.Flags().GetString("reason")
// Verify it's a gate issue
ctx := rootCtx
var issue *types.Issue
var err error
if daemonClient != nil {
resp, rerr := daemonClient.Show(&rpc.ShowArgs{ID: gateID})
if rerr != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", rerr)
os.Exit(1)
}
if !resp.Success {
fmt.Fprintf(os.Stderr, "Error: %s\n", resp.Error)
os.Exit(1)
}
var details types.IssueDetails
if uerr := json.Unmarshal(resp.Data, &details); uerr != nil {
fmt.Fprintf(os.Stderr, "Error parsing response: %v\n", uerr)
os.Exit(1)
}
issue = &details.Issue
} else {
issue, err = store.GetIssue(ctx, gateID)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: gate not found: %s\n", gateID)
os.Exit(1)
}
}
if issue.IssueType != types.TypeGate {
fmt.Fprintf(os.Stderr, "Error: %s is not a gate issue (type=%s)\n", gateID, issue.IssueType)
os.Exit(1)
}
// Close the gate
if daemonClient != nil {
closeArgs := &rpc.CloseArgs{
ID: gateID,
Reason: reason,
}
resp, cerr := daemonClient.CloseIssue(closeArgs)
if cerr != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", cerr)
os.Exit(1)
}
if !resp.Success {
fmt.Fprintf(os.Stderr, "Error: %s\n", resp.Error)
os.Exit(1)
}
} else {
if err := store.CloseIssue(ctx, gateID, reason, actor, ""); err != nil {
fmt.Fprintf(os.Stderr, "Error closing gate: %v\n", err)
os.Exit(1)
}
markDirtyAndScheduleFlush()
}
fmt.Printf("%s Gate resolved: %s\n", ui.RenderPass("✓"), gateID)
if reason != "" {
fmt.Printf(" Reason: %s\n", reason)
}
},
}
// gateCheckCmd evaluates gates and closes those that are resolved
var gateCheckCmd = &cobra.Command{
Use: "check",
Short: "Evaluate gates and close resolved ones",
Long: `Evaluate gate conditions and automatically close resolved gates.
By default, checks all open gates. Use --type to filter by gate type.
Gate types:
gh - Check all GitHub gates (gh:run and gh:pr)
gh:run - Check GitHub Actions workflow runs
gh:pr - Check pull request merge status
timer - Check timer gates (auto-expire based on timeout)
bead - Check cross-rig bead gates
all - Check all gate types
GitHub gates use the 'gh' CLI to query status:
- gh:run checks 'gh run view <id> --json status,conclusion'
- gh:pr checks 'gh pr view <id> --json state,merged'
A gate is resolved when:
- gh:run: status=completed AND conclusion=success
- gh:pr: state=MERGED
- timer: current time > created_at + timeout
- bead: target bead status=closed
A gate is escalated when:
- gh:run: status=completed AND conclusion in (failure, cancelled)
- gh:pr: state=CLOSED AND merged=false
Examples:
bd gate check # Check all gates
bd gate check --type=gh # Check only GitHub gates
bd gate check --type=gh:run # Check only workflow run gates
bd gate check --type=timer # Check only timer gates
bd gate check --type=bead # Check only cross-rig bead gates
bd gate check --dry-run # Show what would happen without changes
bd gate check --escalate # Escalate expired/failed gates`,
Run: func(cmd *cobra.Command, args []string) {
CheckReadonly("gate check")
gateTypeFilter, _ := cmd.Flags().GetString("type")
dryRun, _ := cmd.Flags().GetBool("dry-run")
escalateFlag, _ := cmd.Flags().GetBool("escalate")
limit, _ := cmd.Flags().GetInt("limit")
// Get open gates
gateType := types.TypeGate
filter := types.IssueFilter{
IssueType: &gateType,
ExcludeStatus: []types.Status{types.StatusClosed},
Limit: limit,
}
ctx := rootCtx
var gates []*types.Issue
var err error
if daemonClient != nil {
listArgs := &rpc.ListArgs{
IssueType: "gate",
ExcludeStatus: []string{"closed"},
Limit: limit,
}
resp, rerr := daemonClient.List(listArgs)
if rerr != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", rerr)
os.Exit(1)
}
if uerr := json.Unmarshal(resp.Data, &gates); uerr != nil {
fmt.Fprintf(os.Stderr, "Error parsing response: %v\n", uerr)
os.Exit(1)
}
} else {
gates, err = store.SearchIssues(ctx, "", filter)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
// Filter by type if specified
var filteredGates []*types.Issue
for _, gate := range gates {
if shouldCheckGate(gate, gateTypeFilter) {
filteredGates = append(filteredGates, gate)
}
}
if len(filteredGates) == 0 {
if gateTypeFilter != "" {
fmt.Printf("No open gates of type '%s' found.\n", gateTypeFilter)
} else {
fmt.Println("No open gates found.")
}
return
}
// Results tracking
type checkResult struct {
gate *types.Issue
resolved bool
escalated bool
reason string
err error
}
results := make([]checkResult, 0, len(filteredGates))
// Check each gate
now := time.Now()
for _, gate := range filteredGates {
result := checkResult{gate: gate}
switch {
case strings.HasPrefix(gate.AwaitType, "gh:run"):
result.resolved, result.escalated, result.reason, result.err = checkGHRun(gate)
case strings.HasPrefix(gate.AwaitType, "gh:pr"):
result.resolved, result.escalated, result.reason, result.err = checkGHPR(gate)
case gate.AwaitType == "timer":
result.resolved, result.escalated, result.reason, result.err = checkTimer(gate, now)
case gate.AwaitType == "bead":
result.resolved, result.reason = checkBeadGate(ctx, gate.AwaitID)
default:
// Skip unsupported gate types (human gates need manual resolution)
continue
}
results = append(results, result)
}
// Process results
resolvedCount := 0
escalatedCount := 0
errorCount := 0
for _, r := range results {
if r.err != nil {
errorCount++
fmt.Fprintf(os.Stderr, "%s %s: error checking - %v\n",
ui.RenderFail("✗"), r.gate.ID, r.err)
continue
}
if r.resolved {
resolvedCount++
if dryRun {
fmt.Printf("%s %s: would resolve - %s\n",
ui.RenderPass("✓"), r.gate.ID, r.reason)
} else {
// Close the gate
closeErr := closeGate(ctx, r.gate.ID, r.reason)
if closeErr != nil {
fmt.Fprintf(os.Stderr, "%s %s: error closing - %v\n",
ui.RenderFail("✗"), r.gate.ID, closeErr)
errorCount++
} else {
fmt.Printf("%s %s: resolved - %s\n",
ui.RenderPass("✓"), r.gate.ID, r.reason)
}
}
} else if r.escalated {
escalatedCount++
if dryRun {
fmt.Printf("%s %s: would escalate - %s\n",
ui.RenderWarn("⚠"), r.gate.ID, r.reason)
} else {
fmt.Printf("%s %s: ESCALATE - %s\n",
ui.RenderWarn("⚠"), r.gate.ID, r.reason)
// Actually escalate if flag is set
if escalateFlag {
escalateGate(r.gate, r.reason)
}
}
} else {
// Still pending
fmt.Printf("%s %s: pending - %s\n",
ui.RenderAccent("○"), r.gate.ID, r.reason)
}
}
// Summary
fmt.Println()
fmt.Printf("Checked %d gates: %d resolved, %d escalated, %d errors\n",
len(results), resolvedCount, escalatedCount, errorCount)
if jsonOutput {
summary := map[string]interface{}{
"checked": len(results),
"resolved": resolvedCount,
"escalated": escalatedCount,
"errors": errorCount,
"dry_run": dryRun,
}
outputJSON(summary)
}
},
}
// shouldCheckGate returns true if the gate matches the type filter
func shouldCheckGate(gate *types.Issue, typeFilter string) bool {
if typeFilter == "" || typeFilter == "all" {
return true
}
if typeFilter == "gh" {
return strings.HasPrefix(gate.AwaitType, "gh:")
}
return gate.AwaitType == typeFilter
}
// ghRunStatus holds the JSON response from 'gh run view'
type ghRunStatus struct {
Status string `json:"status"`
Conclusion string `json:"conclusion"`
Name string `json:"name"`
}
// ghPRStatus holds the JSON response from 'gh pr view'
type ghPRStatus struct {
State string `json:"state"`
Merged bool `json:"merged"`
Title string `json:"title"`
}
// checkGHRun checks a GitHub Actions workflow run gate
func checkGHRun(gate *types.Issue) (resolved, escalated bool, reason string, err error) {
if gate.AwaitID == "" {
return false, false, "no run ID specified", nil
}
// Run: gh run view <id> --json status,conclusion,name
cmd := exec.Command("gh", "run", "view", gate.AwaitID, "--json", "status,conclusion,name")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if runErr := cmd.Run(); runErr != nil {
// Check if gh CLI is not found
if strings.Contains(stderr.String(), "command not found") ||
strings.Contains(runErr.Error(), "executable file not found") {
return false, false, "", fmt.Errorf("gh CLI not installed")
}
// Check if run not found
if strings.Contains(stderr.String(), "not found") {
return false, true, "workflow run not found", nil
}
return false, false, "", fmt.Errorf("gh run view failed: %s", stderr.String())
}
var status ghRunStatus
if parseErr := json.Unmarshal(stdout.Bytes(), &status); parseErr != nil {
return false, false, "", fmt.Errorf("failed to parse gh output: %w", parseErr)
}
// Evaluate status
switch status.Status {
case "completed":
switch status.Conclusion {
case "success":
return true, false, fmt.Sprintf("workflow '%s' succeeded", status.Name), nil
case "failure":
return false, true, fmt.Sprintf("workflow '%s' failed", status.Name), nil
case "cancelled":
return false, true, fmt.Sprintf("workflow '%s' was cancelled", status.Name), nil
case "skipped":
return true, false, fmt.Sprintf("workflow '%s' was skipped", status.Name), nil
default:
return false, true, fmt.Sprintf("workflow '%s' concluded with %s", status.Name, status.Conclusion), nil
}
case "in_progress", "queued", "pending", "waiting":
return false, false, fmt.Sprintf("workflow '%s' is %s", status.Name, status.Status), nil
default:
return false, false, fmt.Sprintf("workflow '%s' status: %s", status.Name, status.Status), nil
}
}
// checkGHPR checks a GitHub pull request gate
func checkGHPR(gate *types.Issue) (resolved, escalated bool, reason string, err error) {
if gate.AwaitID == "" {
return false, false, "no PR number specified", nil
}
// Run: gh pr view <id> --json state,merged,title
cmd := exec.Command("gh", "pr", "view", gate.AwaitID, "--json", "state,merged,title")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
if runErr := cmd.Run(); runErr != nil {
// Check if gh CLI is not found
if strings.Contains(stderr.String(), "command not found") ||
strings.Contains(runErr.Error(), "executable file not found") {
return false, false, "", fmt.Errorf("gh CLI not installed")
}
// Check if PR not found
if strings.Contains(stderr.String(), "not found") || strings.Contains(stderr.String(), "Could not resolve") {
return false, true, "pull request not found", nil
}
return false, false, "", fmt.Errorf("gh pr view failed: %s", stderr.String())
}
var status ghPRStatus
if parseErr := json.Unmarshal(stdout.Bytes(), &status); parseErr != nil {
return false, false, "", fmt.Errorf("failed to parse gh output: %w", parseErr)
}
// Evaluate status
switch status.State {
case "MERGED":
return true, false, fmt.Sprintf("PR '%s' was merged", status.Title), nil
case "CLOSED":
if status.Merged {
return true, false, fmt.Sprintf("PR '%s' was merged", status.Title), nil
}
return false, true, fmt.Sprintf("PR '%s' was closed without merging", status.Title), nil
case "OPEN":
return false, false, fmt.Sprintf("PR '%s' is still open", status.Title), nil
default:
return false, false, fmt.Sprintf("PR '%s' state: %s", status.Title, status.State), nil
}
}
// checkTimer checks a timer gate for expiration
func checkTimer(gate *types.Issue, now time.Time) (resolved, escalated bool, reason string, err error) {
if gate.Timeout == 0 {
return false, false, "timer gate without timeout configured", fmt.Errorf("no timeout set")
}
expiresAt := gate.CreatedAt.Add(gate.Timeout)
if now.After(expiresAt) {
expired := now.Sub(expiresAt).Round(time.Second)
return true, false, fmt.Sprintf("timer expired %s ago", expired), nil
}
remaining := expiresAt.Sub(now).Round(time.Second)
return false, false, fmt.Sprintf("expires in %s", remaining), nil
}
// checkBeadGate checks if a cross-rig bead gate is satisfied.
// await_id format: <rig>:<bead-id> (e.g., "gastown:gt-abc123")
// Returns (satisfied, reason).
func checkBeadGate(ctx context.Context, awaitID string) (bool, string) {
// Parse await_id format: <rig>:<bead-id>
parts := strings.SplitN(awaitID, ":", 2)
if len(parts) != 2 {
return false, fmt.Sprintf("invalid await_id format: expected <rig>:<bead-id>, got %q", awaitID)
}
rigName := parts[0]
beadID := parts[1]
if rigName == "" || beadID == "" {
return false, "await_id missing rig name or bead ID"
}
// Resolve the target rig's beads directory
currentBeadsDir := beads.FindBeadsDir()
if currentBeadsDir == "" {
return false, "could not find current beads directory"
}
targetBeadsDir, _, err := routing.ResolveBeadsDirForRig(rigName, currentBeadsDir)
if err != nil {
return false, fmt.Sprintf("rig %q not found: %v", rigName, err)
}
// Load config to get database path
cfg, err := configfile.Load(targetBeadsDir)
if err != nil {
return false, fmt.Sprintf("failed to load config for rig %q: %v", rigName, err)
}
dbPath := cfg.DatabasePath(targetBeadsDir)
// Open the target database (read-only)
db, err := sql.Open("sqlite3", dbPath+"?mode=ro")
if err != nil {
return false, fmt.Sprintf("failed to open database for rig %q: %v", rigName, err)
}
defer func() { _ = db.Close() }()
// Check if the target bead exists and is closed
var status string
err = db.QueryRowContext(ctx, `
SELECT status FROM issues WHERE id = ?
`, beadID).Scan(&status)
if err != nil {
if err == sql.ErrNoRows {
return false, fmt.Sprintf("bead %s not found in rig %s", beadID, rigName)
}
return false, fmt.Sprintf("database query failed: %v", err)
}
if status == string(types.StatusClosed) {
return true, fmt.Sprintf("target bead %s is closed", beadID)
}
return false, fmt.Sprintf("target bead %s status is %q (waiting for closed)", beadID, status)
}
// closeGate closes a gate issue with the given reason
func closeGate(ctx interface{}, gateID, reason string) error {
if daemonClient != nil {
closeArgs := &rpc.CloseArgs{
ID: gateID,
Reason: reason,
}
resp, err := daemonClient.CloseIssue(closeArgs)
if err != nil {
return err
}
if !resp.Success {
return fmt.Errorf("%s", resp.Error)
}
return nil
}
if err := store.CloseIssue(rootCtx, gateID, reason, actor, ""); err != nil {
return err
}
markDirtyAndScheduleFlush()
return nil
}
// escalateGate sends an escalation for a failed/expired gate
func escalateGate(gate *types.Issue, reason string) {
topic := fmt.Sprintf("Gate escalation: %s", gate.ID)
message := fmt.Sprintf("Gate %s needs attention.\nType: %s\nReason: %s\nCreated: %s",
gate.ID,
gate.AwaitType,
reason,
gate.CreatedAt.Format(time.RFC3339))
// Call gt escalate if available
escalateCmd := exec.Command("gt", "escalate", topic, "-s", "HIGH", "-m", message)
escalateCmd.Stdout = os.Stdout
escalateCmd.Stderr = os.Stderr
if err := escalateCmd.Run(); err != nil {
fmt.Fprintf(os.Stderr, "Warning: escalation failed for %s: %v\n", gate.ID, err)
}
}
func init() {
// gate list flags
gateListCmd.Flags().BoolP("all", "a", false, "Show all gates including closed")
gateListCmd.Flags().IntP("limit", "n", 50, "Limit results (default 50)")
// gate resolve flags
gateResolveCmd.Flags().StringP("reason", "r", "", "Reason for resolving the gate")
// gate check flags
gateCheckCmd.Flags().StringP("type", "t", "", "Gate type to check (gh, gh:run, gh:pr, timer, bead, all)")
gateCheckCmd.Flags().Bool("dry-run", false, "Show what would happen without making changes")
gateCheckCmd.Flags().BoolP("escalate", "e", false, "Escalate failed/expired gates")
gateCheckCmd.Flags().IntP("limit", "l", 100, "Limit results (default 100)")
// Add subcommands
gateCmd.AddCommand(gateListCmd)
gateCmd.AddCommand(gateResolveCmd)
gateCmd.AddCommand(gateCheckCmd)
rootCmd.AddCommand(gateCmd)
}