Adds closed_by_session tracking for entity CV building per Gas Town decision 009-session-events-architecture.md. Changes: - Add ClosedBySession field to Issue struct - Add closed_by_session column to issues table (migration 034) - Add --session flag to bd close command - Support CLAUDE_SESSION_ID env var as fallback - Add --session flag to bd update for status=closed - Display closed_by_session in bd show output - Update Storage interface to include session parameter in CloseIssue 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> Executed-By: beads/crew/dave Rig: beads Role: crew
1049 lines
30 KiB
Go
1049 lines
30 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
"github.com/steveyegge/beads/internal/rpc"
|
|
"github.com/steveyegge/beads/internal/storage"
|
|
"github.com/steveyegge/beads/internal/storage/sqlite"
|
|
"github.com/steveyegge/beads/internal/types"
|
|
"github.com/steveyegge/beads/internal/ui"
|
|
"github.com/steveyegge/beads/internal/utils"
|
|
)
|
|
|
|
// Gate commands - async coordination primitives for agent workflows
|
|
//
|
|
// Gates are wisp issues that block until external conditions are met.
|
|
// They enable agents to wait on:
|
|
// - GitHub Actions run completion (gh:run:<id>)
|
|
// - Pull request merge/close (gh:pr:<id>)
|
|
// - Simple timer delay (timer:<duration>)
|
|
// - Human approval (human:<prompt>)
|
|
// - Mail matching a pattern (mail:<pattern>)
|
|
//
|
|
// Usage:
|
|
// bd gate create --await gh:run:123 --timeout 30m --notify beads/dave
|
|
// bd gate show <id>
|
|
// bd gate list
|
|
// bd gate close <id> --reason "completed"
|
|
// bd gate wait <id> --notify beads/alice
|
|
|
|
var gateCmd = &cobra.Command{
|
|
Use: "gate",
|
|
Short: "Gate commands (async coordination)",
|
|
Long: `Manage gates - async coordination primitives for agent workflows.
|
|
|
|
Gates are ephemeral (wisp) issues that block until external conditions are met.
|
|
The orchestrator is responsible for monitoring and closing gates.
|
|
|
|
Await types:
|
|
gh:run:<id> Wait for GitHub Actions run to complete
|
|
gh:pr:<id> Wait for pull request to be merged/closed
|
|
timer:<duration> Simple delay (e.g., timer:30m, timer:1h)
|
|
human:<prompt> Wait for human approval
|
|
mail:<pattern> Wait for mail matching pattern
|
|
|
|
Commands:
|
|
create Create a new gate
|
|
show Show gate details
|
|
list List open gates
|
|
close Close a gate
|
|
wait Add a waiter to an existing gate`,
|
|
}
|
|
|
|
var gateCreateCmd = &cobra.Command{
|
|
Use: "create",
|
|
Short: "Create a new gate",
|
|
Long: `Create a new gate to wait on an external condition.
|
|
|
|
The gate will be created as a wisp issue (ephemeral). The orchestrator
|
|
is responsible for monitoring gates and closing them when conditions are met.
|
|
|
|
Examples:
|
|
bd gate create --await gh:run:123456789 --timeout 30m --notify beads/dave
|
|
bd gate create --await gh:pr:42 --timeout 1h --notify beads/dave beads/alice
|
|
bd gate create --await timer:15m --notify beads/dave
|
|
bd gate create --await human:approve-deploy --timeout 4h --notify beads/dave`,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate create")
|
|
ctx := rootCtx
|
|
|
|
awaitSpec, _ := cmd.Flags().GetString("await")
|
|
timeoutStr, _ := cmd.Flags().GetString("timeout")
|
|
notifyAddrs, _ := cmd.Flags().GetStringSlice("notify")
|
|
title, _ := cmd.Flags().GetString("title")
|
|
|
|
if awaitSpec == "" {
|
|
fmt.Fprintf(os.Stderr, "Error: --await is required\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Parse await spec into type and ID
|
|
awaitType, awaitID := parseAwaitSpec(awaitSpec)
|
|
if awaitType == "" {
|
|
fmt.Fprintf(os.Stderr, "Error: invalid await spec %q, expected format: type:id\n", awaitSpec)
|
|
fmt.Fprintf(os.Stderr, "Valid types: gh:run, gh:pr, timer, human, mail\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Parse timeout
|
|
var timeout time.Duration
|
|
if timeoutStr != "" {
|
|
var err error
|
|
timeout, err = time.ParseDuration(timeoutStr)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: invalid timeout %q: %v\n", timeoutStr, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
// For timer gates, the await_id IS the duration - use it as timeout if not explicitly set
|
|
if awaitType == "timer" && timeout == 0 {
|
|
var err error
|
|
timeout, err = time.ParseDuration(awaitID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: invalid timer duration %q: %v\n", awaitID, err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
// Generate title if not provided
|
|
if title == "" {
|
|
title = fmt.Sprintf("Gate: %s:%s", awaitType, awaitID)
|
|
}
|
|
|
|
var gate *types.Issue
|
|
|
|
// Try daemon first, fall back to direct store access
|
|
if daemonClient != nil {
|
|
resp, err := daemonClient.GateCreate(&rpc.GateCreateArgs{
|
|
Title: title,
|
|
AwaitType: awaitType,
|
|
AwaitID: awaitID,
|
|
Timeout: timeout,
|
|
Waiters: notifyAddrs,
|
|
})
|
|
if err != nil {
|
|
FatalError("gate create: %v", err)
|
|
}
|
|
|
|
// Parse the gate ID from response and fetch full gate
|
|
var result rpc.GateCreateResult
|
|
if err := json.Unmarshal(resp.Data, &result); err != nil {
|
|
FatalError("failed to parse gate create result: %v", err)
|
|
}
|
|
|
|
// Get the full gate for output
|
|
showResp, err := daemonClient.GateShow(&rpc.GateShowArgs{ID: result.ID})
|
|
if err != nil {
|
|
FatalError("failed to fetch created gate: %v", err)
|
|
}
|
|
if err := json.Unmarshal(showResp.Data, &gate); err != nil {
|
|
FatalError("failed to parse gate: %v", err)
|
|
}
|
|
} else if store != nil {
|
|
now := time.Now()
|
|
gate = &types.Issue{
|
|
// ID will be generated by CreateIssue
|
|
Title: title,
|
|
IssueType: types.TypeGate,
|
|
Status: types.StatusOpen,
|
|
Priority: 1, // Gates are typically high priority
|
|
// Assignee left empty - orchestrator decides who processes gates
|
|
Ephemeral: true, // Gates are wisps (ephemeral)
|
|
AwaitType: awaitType,
|
|
AwaitID: awaitID,
|
|
Timeout: timeout,
|
|
Waiters: notifyAddrs,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
gate.ContentHash = gate.ComputeContentHash()
|
|
|
|
if err := store.CreateIssue(ctx, gate, actor); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error creating gate: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
markDirtyAndScheduleFlush()
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(gate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("%s Created gate: %s\n", ui.RenderPass("✓"), gate.ID)
|
|
fmt.Printf(" Await: %s:%s\n", awaitType, awaitID)
|
|
if timeout > 0 {
|
|
fmt.Printf(" Timeout: %v\n", timeout)
|
|
}
|
|
if len(notifyAddrs) > 0 {
|
|
fmt.Printf(" Notify: %s\n", strings.Join(notifyAddrs, ", "))
|
|
}
|
|
},
|
|
}
|
|
|
|
// parseAwaitSpec parses an await specification like "gh:run:123" or "timer:30m"
|
|
// Returns (awaitType, awaitID) or ("", "") if invalid
|
|
func parseAwaitSpec(spec string) (string, string) {
|
|
// Handle compound types like gh:run:123 and gh:pr:42
|
|
if strings.HasPrefix(spec, "gh:run:") {
|
|
return "gh:run", strings.TrimPrefix(spec, "gh:run:")
|
|
}
|
|
if strings.HasPrefix(spec, "gh:pr:") {
|
|
return "gh:pr", strings.TrimPrefix(spec, "gh:pr:")
|
|
}
|
|
|
|
// Handle simple types like timer:30m, human:approve, mail:pattern
|
|
parts := strings.SplitN(spec, ":", 2)
|
|
if len(parts) != 2 {
|
|
return "", ""
|
|
}
|
|
|
|
awaitType := parts[0]
|
|
awaitID := parts[1]
|
|
|
|
// Validate type
|
|
validTypes := map[string]bool{
|
|
"timer": true,
|
|
"human": true,
|
|
"mail": true,
|
|
}
|
|
if !validTypes[awaitType] {
|
|
return "", ""
|
|
}
|
|
|
|
return awaitType, awaitID
|
|
}
|
|
|
|
var gateShowCmd = &cobra.Command{
|
|
Use: "show <gate-id>",
|
|
Short: "Show gate details",
|
|
Args: cobra.ExactArgs(1),
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
ctx := rootCtx
|
|
|
|
var gate *types.Issue
|
|
|
|
// Try daemon first, fall back to direct store access
|
|
if daemonClient != nil {
|
|
resp, err := daemonClient.GateShow(&rpc.GateShowArgs{ID: args[0]})
|
|
if err != nil {
|
|
FatalError("gate show: %v", err)
|
|
}
|
|
if err := json.Unmarshal(resp.Data, &gate); err != nil {
|
|
FatalError("failed to parse gate: %v", err)
|
|
}
|
|
} else if store != nil {
|
|
gateID, err := utils.ResolvePartialID(ctx, store, args[0])
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
gate, err = store.GetIssue(ctx, gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if gate == nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.IssueType != types.TypeGate {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
|
|
os.Exit(1)
|
|
}
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(gate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("\n%s Gate: %s\n", ui.RenderAccent("🚧"), gate.ID)
|
|
fmt.Printf(" Title: %s\n", gate.Title)
|
|
fmt.Printf(" Status: %s\n", gate.Status)
|
|
fmt.Printf(" Await: %s:%s\n", gate.AwaitType, gate.AwaitID)
|
|
if gate.Timeout > 0 {
|
|
elapsed := time.Since(gate.CreatedAt)
|
|
remaining := gate.Timeout - elapsed
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
fmt.Printf(" Timeout: %v (remaining: %v)\n", gate.Timeout, remaining.Round(time.Second))
|
|
}
|
|
if len(gate.Waiters) > 0 {
|
|
fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", "))
|
|
}
|
|
fmt.Printf(" Created: %s\n", gate.CreatedAt.Format("2006-01-02 15:04:05"))
|
|
if gate.CloseReason != "" {
|
|
fmt.Printf(" Close reason: %s\n", gate.CloseReason)
|
|
}
|
|
fmt.Println()
|
|
},
|
|
}
|
|
|
|
var gateListCmd = &cobra.Command{
|
|
Use: "list",
|
|
Short: "List open gates",
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
ctx := rootCtx
|
|
showAll, _ := cmd.Flags().GetBool("all")
|
|
|
|
var issues []*types.Issue
|
|
|
|
// Try daemon first, fall back to direct store access
|
|
if daemonClient != nil {
|
|
resp, err := daemonClient.GateList(&rpc.GateListArgs{All: showAll})
|
|
if err != nil {
|
|
FatalError("gate list: %v", err)
|
|
}
|
|
if err := json.Unmarshal(resp.Data, &issues); err != nil {
|
|
FatalError("failed to parse gates: %v", err)
|
|
}
|
|
} else if store != nil {
|
|
// Build filter for gates
|
|
gateType := types.TypeGate
|
|
filter := types.IssueFilter{
|
|
IssueType: &gateType,
|
|
}
|
|
if !showAll {
|
|
openStatus := types.StatusOpen
|
|
filter.Status = &openStatus
|
|
}
|
|
|
|
var err error
|
|
issues, err = store.SearchIssues(ctx, "", filter)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error listing gates: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(issues)
|
|
return
|
|
}
|
|
|
|
if len(issues) == 0 {
|
|
fmt.Println("No gates found")
|
|
return
|
|
}
|
|
|
|
fmt.Printf("\n%s Gates (%d):\n\n", ui.RenderAccent("🚧"), len(issues))
|
|
for _, gate := range issues {
|
|
statusIcon := "⏳"
|
|
if gate.Status == types.StatusClosed {
|
|
statusIcon = "✓"
|
|
}
|
|
awaitSpec := fmt.Sprintf("%s:%s", gate.AwaitType, gate.AwaitID)
|
|
timeoutInfo := ""
|
|
if gate.Timeout > 0 {
|
|
elapsed := time.Since(gate.CreatedAt)
|
|
remaining := gate.Timeout - elapsed
|
|
if remaining < 0 {
|
|
timeoutInfo = " (timed out)"
|
|
} else {
|
|
timeoutInfo = fmt.Sprintf(" (%v left)", remaining.Round(time.Second))
|
|
}
|
|
}
|
|
fmt.Printf(" %s %s: %s%s\n", statusIcon, gate.ID, awaitSpec, timeoutInfo)
|
|
if len(gate.Waiters) > 0 {
|
|
fmt.Printf(" Waiters: %s\n", strings.Join(gate.Waiters, ", "))
|
|
}
|
|
}
|
|
fmt.Println()
|
|
},
|
|
}
|
|
|
|
var gateCloseCmd = &cobra.Command{
|
|
Use: "close <gate-id>",
|
|
Short: "Close a gate",
|
|
Args: cobra.ExactArgs(1),
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate close")
|
|
ctx := rootCtx
|
|
reason, _ := cmd.Flags().GetString("reason")
|
|
if reason == "" {
|
|
reason = "Gate closed"
|
|
}
|
|
|
|
var closedGate *types.Issue
|
|
var gateID string
|
|
|
|
// Try daemon first, fall back to direct store access
|
|
if daemonClient != nil {
|
|
resp, err := daemonClient.GateClose(&rpc.GateCloseArgs{
|
|
ID: args[0],
|
|
Reason: reason,
|
|
})
|
|
if err != nil {
|
|
FatalError("gate close: %v", err)
|
|
}
|
|
if err := json.Unmarshal(resp.Data, &closedGate); err != nil {
|
|
FatalError("failed to parse gate: %v", err)
|
|
}
|
|
gateID = closedGate.ID
|
|
} else if store != nil {
|
|
var err error
|
|
gateID, err = utils.ResolvePartialID(ctx, store, args[0])
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Verify it's a gate
|
|
gate, err := store.GetIssue(ctx, gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if gate == nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.IssueType != types.TypeGate {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if err := store.CloseIssue(ctx, gateID, reason, actor, ""); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error closing gate: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
markDirtyAndScheduleFlush()
|
|
closedGate, _ = store.GetIssue(ctx, gateID)
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(closedGate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("%s Closed gate: %s\n", ui.RenderPass("✓"), gateID)
|
|
fmt.Printf(" Reason: %s\n", reason)
|
|
},
|
|
}
|
|
|
|
var gateApproveCmd = &cobra.Command{
|
|
Use: "approve <gate-id>",
|
|
Short: "Approve a human gate",
|
|
Long: `Approve a human gate, closing it and notifying waiters.
|
|
|
|
Human gates (created with --await human:<prompt>) require explicit approval
|
|
to close. This is the command that provides that approval.
|
|
|
|
Example:
|
|
bd gate create --await human:approve-deploy --notify gastown/witness
|
|
# ... later, when ready to approve ...
|
|
bd gate approve <gate-id>
|
|
bd gate approve <gate-id> --comment "Reviewed and approved by Steve"`,
|
|
Args: cobra.ExactArgs(1),
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate approve")
|
|
ctx := rootCtx
|
|
comment, _ := cmd.Flags().GetString("comment")
|
|
|
|
var closedGate *types.Issue
|
|
var gateID string
|
|
|
|
// Try daemon first, fall back to direct store access
|
|
if daemonClient != nil {
|
|
// First get the gate to verify it's a human gate
|
|
showResp, err := daemonClient.GateShow(&rpc.GateShowArgs{ID: args[0]})
|
|
if err != nil {
|
|
FatalError("gate approve: %v", err)
|
|
}
|
|
var gate types.Issue
|
|
if err := json.Unmarshal(showResp.Data, &gate); err != nil {
|
|
FatalError("failed to parse gate: %v", err)
|
|
}
|
|
if gate.AwaitType != "human" {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a human gate (type: %s:%s)\n", args[0], gate.AwaitType, gate.AwaitID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.Status == types.StatusClosed {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s is already closed\n", args[0])
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Close with approval reason
|
|
reason := fmt.Sprintf("Human approval granted: %s", gate.AwaitID)
|
|
if comment != "" {
|
|
reason = fmt.Sprintf("Human approval granted: %s (%s)", gate.AwaitID, comment)
|
|
}
|
|
|
|
resp, err := daemonClient.GateClose(&rpc.GateCloseArgs{
|
|
ID: args[0],
|
|
Reason: reason,
|
|
})
|
|
if err != nil {
|
|
FatalError("gate approve: %v", err)
|
|
}
|
|
if err := json.Unmarshal(resp.Data, &closedGate); err != nil {
|
|
FatalError("failed to parse gate: %v", err)
|
|
}
|
|
gateID = closedGate.ID
|
|
} else if store != nil {
|
|
var err error
|
|
gateID, err = utils.ResolvePartialID(ctx, store, args[0])
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Get gate and verify it's a human gate
|
|
gate, err := store.GetIssue(ctx, gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if gate == nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.IssueType != types.TypeGate {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
|
|
os.Exit(1)
|
|
}
|
|
if gate.AwaitType != "human" {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a human gate (type: %s:%s)\n", gateID, gate.AwaitType, gate.AwaitID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.Status == types.StatusClosed {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s is already closed\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Close with approval reason
|
|
reason := fmt.Sprintf("Human approval granted: %s", gate.AwaitID)
|
|
if comment != "" {
|
|
reason = fmt.Sprintf("Human approval granted: %s (%s)", gate.AwaitID, comment)
|
|
}
|
|
|
|
if err := store.CloseIssue(ctx, gateID, reason, actor, ""); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error closing gate: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
markDirtyAndScheduleFlush()
|
|
closedGate, _ = store.GetIssue(ctx, gateID)
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(closedGate)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("%s Approved gate: %s\n", ui.RenderPass("✓"), gateID)
|
|
if closedGate != nil && closedGate.CloseReason != "" {
|
|
fmt.Printf(" %s\n", closedGate.CloseReason)
|
|
}
|
|
if closedGate != nil && len(closedGate.Waiters) > 0 {
|
|
fmt.Printf(" Waiters notified: %s\n", strings.Join(closedGate.Waiters, ", "))
|
|
}
|
|
},
|
|
}
|
|
|
|
var gateWaitCmd = &cobra.Command{
|
|
Use: "wait <gate-id>",
|
|
Short: "Add a waiter to an existing gate",
|
|
Args: cobra.ExactArgs(1),
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate wait")
|
|
ctx := rootCtx
|
|
notifyAddrs, _ := cmd.Flags().GetStringSlice("notify")
|
|
|
|
if len(notifyAddrs) == 0 {
|
|
fmt.Fprintf(os.Stderr, "Error: --notify is required\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
var addedCount int
|
|
var gateID string
|
|
var newWaiters []string
|
|
|
|
// Try daemon first, fall back to direct store access
|
|
if daemonClient != nil {
|
|
resp, err := daemonClient.GateWait(&rpc.GateWaitArgs{
|
|
ID: args[0],
|
|
Waiters: notifyAddrs,
|
|
})
|
|
if err != nil {
|
|
FatalError("gate wait: %v", err)
|
|
}
|
|
var result rpc.GateWaitResult
|
|
if err := json.Unmarshal(resp.Data, &result); err != nil {
|
|
FatalError("failed to parse gate wait result: %v", err)
|
|
}
|
|
addedCount = result.AddedCount
|
|
gateID = args[0] // Use the input ID for display
|
|
// For daemon mode, we don't know exactly which waiters were added
|
|
// Just report the count
|
|
newWaiters = nil
|
|
} else if store != nil {
|
|
var err error
|
|
gateID, err = utils.ResolvePartialID(ctx, store, args[0])
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Get existing gate
|
|
gate, err := store.GetIssue(ctx, gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
if gate == nil {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s not found\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
if gate.IssueType != types.TypeGate {
|
|
fmt.Fprintf(os.Stderr, "Error: %s is not a gate (type: %s)\n", gateID, gate.IssueType)
|
|
os.Exit(1)
|
|
}
|
|
if gate.Status == types.StatusClosed {
|
|
fmt.Fprintf(os.Stderr, "Error: gate %s is already closed\n", gateID)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Add new waiters (avoiding duplicates)
|
|
waiterSet := make(map[string]bool)
|
|
for _, w := range gate.Waiters {
|
|
waiterSet[w] = true
|
|
}
|
|
for _, addr := range notifyAddrs {
|
|
if !waiterSet[addr] {
|
|
newWaiters = append(newWaiters, addr)
|
|
waiterSet[addr] = true
|
|
}
|
|
}
|
|
|
|
addedCount = len(newWaiters)
|
|
|
|
if addedCount == 0 {
|
|
fmt.Println("All specified waiters are already registered on this gate")
|
|
return
|
|
}
|
|
|
|
// Update waiters - need to use SQLite directly for Waiters field
|
|
sqliteStore, ok := store.(*sqlite.SQLiteStorage)
|
|
if !ok {
|
|
fmt.Fprintf(os.Stderr, "Error: gate wait requires SQLite storage\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
allWaiters := append(gate.Waiters, newWaiters...)
|
|
waitersJSON, _ := json.Marshal(allWaiters)
|
|
|
|
// Use raw SQL to update the waiters field
|
|
_, err = sqliteStore.UnderlyingDB().ExecContext(ctx, `UPDATE issues SET waiters = ?, updated_at = ? WHERE id = ?`,
|
|
string(waitersJSON), time.Now(), gateID)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error adding waiters: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
markDirtyAndScheduleFlush()
|
|
|
|
if jsonOutput {
|
|
updatedGate, _ := store.GetIssue(ctx, gateID)
|
|
outputJSON(updatedGate)
|
|
return
|
|
}
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "Error: no database connection\n")
|
|
os.Exit(1)
|
|
}
|
|
|
|
if addedCount == 0 {
|
|
fmt.Println("All specified waiters are already registered on this gate")
|
|
return
|
|
}
|
|
|
|
if jsonOutput {
|
|
// For daemon mode, output the result
|
|
outputJSON(map[string]interface{}{"added_count": addedCount, "gate_id": gateID})
|
|
return
|
|
}
|
|
|
|
fmt.Printf("%s Added %d waiter(s) to gate %s\n", ui.RenderPass("✓"), addedCount, gateID)
|
|
for _, addr := range newWaiters {
|
|
fmt.Printf(" + %s\n", addr)
|
|
}
|
|
},
|
|
}
|
|
|
|
var gateEvalCmd = &cobra.Command{
|
|
Use: "eval",
|
|
Short: "Evaluate pending gates and close elapsed ones",
|
|
Long: `Evaluate all open gates and close those whose conditions are met.
|
|
|
|
Supported gate types:
|
|
- timer gates: closed when elapsed time exceeds timeout
|
|
- gh:run gates: closed when GitHub Actions run completes (requires gh CLI)
|
|
- gh:pr gates: closed when PR is merged/closed (requires gh CLI)
|
|
|
|
This command is idempotent and safe to run repeatedly.`,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
CheckReadonly("gate eval")
|
|
ctx := rootCtx
|
|
dryRun, _ := cmd.Flags().GetBool("dry-run")
|
|
|
|
var gates []*types.Issue
|
|
|
|
// Get all open gates
|
|
if daemonClient != nil {
|
|
resp, err := daemonClient.GateList(&rpc.GateListArgs{All: false})
|
|
if err != nil {
|
|
FatalError("gate eval: %v", err)
|
|
}
|
|
if err := json.Unmarshal(resp.Data, &gates); err != nil {
|
|
FatalError("failed to parse gates: %v", err)
|
|
}
|
|
} else if store != nil {
|
|
gateType := types.TypeGate
|
|
openStatus := types.StatusOpen
|
|
filter := types.IssueFilter{
|
|
IssueType: &gateType,
|
|
Status: &openStatus,
|
|
}
|
|
var err error
|
|
gates, err = store.SearchIssues(ctx, "", filter)
|
|
if err != nil {
|
|
FatalError("listing gates: %v", err)
|
|
}
|
|
} else {
|
|
FatalError("no database connection")
|
|
}
|
|
|
|
if len(gates) == 0 {
|
|
if !jsonOutput {
|
|
fmt.Println("No open gates to evaluate")
|
|
}
|
|
return
|
|
}
|
|
|
|
var closed []string
|
|
var skipped []string
|
|
var awaitingHuman []string
|
|
var awaitingMail []string
|
|
now := time.Now()
|
|
|
|
for _, gate := range gates {
|
|
var shouldClose bool
|
|
var reason string
|
|
|
|
switch gate.AwaitType {
|
|
case "timer":
|
|
shouldClose, reason = evalTimerGate(gate, now)
|
|
case "gh:run":
|
|
shouldClose, reason = evalGHRunGate(gate)
|
|
case "gh:pr":
|
|
shouldClose, reason = evalGHPRGate(gate)
|
|
case "human":
|
|
// Human gates require explicit approval via 'bd gate approve'
|
|
awaitingHuman = append(awaitingHuman, gate.ID)
|
|
continue
|
|
case "mail":
|
|
// Mail gates check for messages matching the pattern
|
|
if store != nil {
|
|
shouldClose, reason = evalMailGate(ctx, store, gate)
|
|
} else {
|
|
// Daemon mode - can't evaluate mail gates without store access
|
|
awaitingMail = append(awaitingMail, gate.ID)
|
|
continue
|
|
}
|
|
default:
|
|
// Unsupported gate type - skip
|
|
skipped = append(skipped, gate.ID)
|
|
continue
|
|
}
|
|
|
|
if !shouldClose {
|
|
continue
|
|
}
|
|
|
|
// Gate condition met - close it
|
|
if dryRun {
|
|
fmt.Printf("Would close gate %s (%s)\n", gate.ID, reason)
|
|
closed = append(closed, gate.ID)
|
|
continue
|
|
}
|
|
|
|
if daemonClient != nil {
|
|
_, err := daemonClient.GateClose(&rpc.GateCloseArgs{
|
|
ID: gate.ID,
|
|
Reason: reason,
|
|
})
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Warning: failed to close gate %s: %v\n", gate.ID, err)
|
|
continue
|
|
}
|
|
} else if store != nil {
|
|
if err := store.CloseIssue(ctx, gate.ID, reason, actor, ""); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Warning: failed to close gate %s: %v\n", gate.ID, err)
|
|
continue
|
|
}
|
|
markDirtyAndScheduleFlush()
|
|
}
|
|
|
|
closed = append(closed, gate.ID)
|
|
}
|
|
|
|
if jsonOutput {
|
|
outputJSON(map[string]interface{}{
|
|
"evaluated": len(gates),
|
|
"closed": closed,
|
|
"awaiting_human": awaitingHuman,
|
|
"awaiting_mail": awaitingMail,
|
|
"skipped": skipped,
|
|
})
|
|
return
|
|
}
|
|
|
|
if len(closed) == 0 {
|
|
fmt.Printf("Evaluated %d gates, none ready to close\n", len(gates))
|
|
} else {
|
|
action := "Closed"
|
|
if dryRun {
|
|
action = "Would close"
|
|
}
|
|
fmt.Printf("%s %s %d gate(s)\n", ui.RenderPass("✓"), action, len(closed))
|
|
for _, id := range closed {
|
|
fmt.Printf(" %s\n", id)
|
|
}
|
|
}
|
|
if len(awaitingHuman) > 0 {
|
|
fmt.Printf("Awaiting human approval: %s\n", strings.Join(awaitingHuman, ", "))
|
|
fmt.Printf(" Use 'bd gate approve <id>' to approve\n")
|
|
}
|
|
if len(awaitingMail) > 0 {
|
|
fmt.Printf("Awaiting mail: %s\n", strings.Join(awaitingMail, ", "))
|
|
}
|
|
if len(skipped) > 0 {
|
|
fmt.Printf("Skipped %d unsupported gate(s): %s\n", len(skipped), strings.Join(skipped, ", "))
|
|
}
|
|
},
|
|
}
|
|
|
|
// evalTimerGate checks if a timer gate's duration has elapsed.
|
|
func evalTimerGate(gate *types.Issue, now time.Time) (bool, string) {
|
|
if gate.Timeout <= 0 {
|
|
return false, "" // No timeout set
|
|
}
|
|
|
|
elapsed := now.Sub(gate.CreatedAt)
|
|
if elapsed < gate.Timeout {
|
|
return false, "" // Not yet elapsed
|
|
}
|
|
|
|
return true, fmt.Sprintf("Timer elapsed (%v)", gate.Timeout)
|
|
}
|
|
|
|
// ghRunStatus represents the JSON output of `gh run view --json`
|
|
type ghRunStatus struct {
|
|
Status string `json:"status"` // queued, in_progress, completed
|
|
Conclusion string `json:"conclusion"` // success, failure, canceled, skipped, etc.
|
|
}
|
|
|
|
// evalGHRunGate checks if a GitHub Actions run has completed.
|
|
// Uses `gh run view <run_id> --json status,conclusion` to check status.
|
|
func evalGHRunGate(gate *types.Issue) (bool, string) {
|
|
runID := gate.AwaitID
|
|
if runID == "" {
|
|
return false, ""
|
|
}
|
|
|
|
// Run gh CLI to get run status
|
|
cmd := exec.Command("gh", "run", "view", runID, "--json", "status,conclusion") //nolint:gosec // runID is from trusted issue.AwaitID field
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
// gh CLI failed - could be network issue, invalid run ID, or gh not installed
|
|
// Don't close the gate, just skip it
|
|
return false, ""
|
|
}
|
|
|
|
var status ghRunStatus
|
|
if err := json.Unmarshal(output, &status); err != nil {
|
|
return false, ""
|
|
}
|
|
|
|
// Only close if status is "completed"
|
|
if status.Status != "completed" {
|
|
return false, ""
|
|
}
|
|
|
|
// Run completed - include conclusion in reason
|
|
reason := fmt.Sprintf("GitHub Actions run %s completed", runID)
|
|
if status.Conclusion != "" {
|
|
reason = fmt.Sprintf("GitHub Actions run %s: %s", runID, status.Conclusion)
|
|
}
|
|
|
|
return true, reason
|
|
}
|
|
|
|
// ghPRStatus represents the JSON output of `gh pr view --json`
|
|
type ghPRStatus struct {
|
|
State string `json:"state"` // OPEN, CLOSED, MERGED
|
|
MergedAt string `json:"mergedAt"` // non-empty if merged
|
|
}
|
|
|
|
// evalGHPRGate checks if a GitHub PR has been merged or closed.
|
|
// Uses `gh pr view <pr_number> --json state,mergedAt` to check status.
|
|
func evalGHPRGate(gate *types.Issue) (bool, string) {
|
|
prNumber := gate.AwaitID
|
|
if prNumber == "" {
|
|
return false, ""
|
|
}
|
|
|
|
// Run gh CLI to get PR status
|
|
cmd := exec.Command("gh", "pr", "view", prNumber, "--json", "state,mergedAt") //nolint:gosec // prNumber is from trusted issue.AwaitID field
|
|
output, err := cmd.Output()
|
|
if err != nil {
|
|
// gh CLI failed - could be network issue, invalid PR, or gh not installed
|
|
// Don't close the gate, just skip it
|
|
return false, ""
|
|
}
|
|
|
|
var status ghPRStatus
|
|
if err := json.Unmarshal(output, &status); err != nil {
|
|
return false, ""
|
|
}
|
|
|
|
// Close gate if PR is no longer OPEN
|
|
// State is "MERGED" for merged PRs, "CLOSED" for closed-without-merge
|
|
switch status.State {
|
|
case "MERGED":
|
|
return true, fmt.Sprintf("PR #%s merged", prNumber)
|
|
case "CLOSED":
|
|
return true, fmt.Sprintf("PR #%s closed without merge", prNumber)
|
|
default:
|
|
// Still OPEN
|
|
return false, ""
|
|
}
|
|
}
|
|
|
|
// evalMailGate checks if any message matching the pattern exists.
|
|
// The pattern (await_id) is matched as a case-insensitive substring of message subjects.
|
|
// If waiters are specified, only messages addressed to those waiters are considered.
|
|
func evalMailGate(ctx context.Context, store storage.Storage, gate *types.Issue) (bool, string) {
|
|
pattern := gate.AwaitID
|
|
if pattern == "" {
|
|
return false, ""
|
|
}
|
|
|
|
// Search for messages
|
|
msgType := types.TypeMessage
|
|
openStatus := types.StatusOpen
|
|
filter := types.IssueFilter{
|
|
IssueType: &msgType,
|
|
Status: &openStatus,
|
|
}
|
|
|
|
messages, err := store.SearchIssues(ctx, "", filter)
|
|
if err != nil {
|
|
return false, ""
|
|
}
|
|
|
|
// Convert pattern to lowercase for case-insensitive matching
|
|
patternLower := strings.ToLower(pattern)
|
|
|
|
// Build waiter set for efficient lookup (if waiters specified)
|
|
waiterSet := make(map[string]bool)
|
|
for _, w := range gate.Waiters {
|
|
waiterSet[w] = true
|
|
}
|
|
|
|
// Check each message
|
|
for _, msg := range messages {
|
|
// Check subject contains pattern (case-insensitive)
|
|
if !strings.Contains(strings.ToLower(msg.Title), patternLower) {
|
|
continue
|
|
}
|
|
|
|
// If waiters specified, check if message is addressed to a waiter
|
|
// Messages use Assignee field for recipient
|
|
if len(waiterSet) > 0 {
|
|
if !waiterSet[msg.Assignee] {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Found a matching message
|
|
return true, fmt.Sprintf("Mail received: %s", msg.Title)
|
|
}
|
|
|
|
return false, ""
|
|
}
|
|
|
|
func init() {
|
|
// Gate eval flags
|
|
gateEvalCmd.Flags().Bool("dry-run", false, "Show what would be closed without actually closing")
|
|
gateEvalCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate create flags
|
|
gateCreateCmd.Flags().String("await", "", "Await spec: gh:run:<id>, gh:pr:<id>, timer:<duration>, human:<prompt>, mail:<pattern> (required)")
|
|
gateCreateCmd.Flags().String("timeout", "", "Timeout duration (e.g., 30m, 1h)")
|
|
gateCreateCmd.Flags().StringSlice("notify", nil, "Mail addresses to notify when gate clears (repeatable)")
|
|
gateCreateCmd.Flags().String("title", "", "Custom title for the gate")
|
|
gateCreateCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate show flags
|
|
gateShowCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate list flags
|
|
gateListCmd.Flags().Bool("all", false, "Show all gates including closed")
|
|
gateListCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate close flags
|
|
gateCloseCmd.Flags().StringP("reason", "r", "", "Reason for closing")
|
|
gateCloseCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate approve flags
|
|
gateApproveCmd.Flags().String("comment", "", "Optional approval comment")
|
|
gateApproveCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Gate wait flags
|
|
gateWaitCmd.Flags().StringSlice("notify", nil, "Mail addresses to add as waiters (repeatable, required)")
|
|
gateWaitCmd.Flags().Bool("json", false, "Output JSON format")
|
|
|
|
// Add subcommands to gate command
|
|
gateCmd.AddCommand(gateCreateCmd)
|
|
gateCmd.AddCommand(gateShowCmd)
|
|
gateCmd.AddCommand(gateListCmd)
|
|
gateCmd.AddCommand(gateCloseCmd)
|
|
gateCmd.AddCommand(gateApproveCmd)
|
|
gateCmd.AddCommand(gateWaitCmd)
|
|
gateCmd.AddCommand(gateEvalCmd)
|
|
|
|
// Add gate command to root
|
|
rootCmd.AddCommand(gateCmd)
|
|
}
|