Files
beads/cmd/bd/merge_slot.go
jane 251ded73be fix(merge-slot): use daemon RPC to get issue_prefix in daemon mode
When running in daemon mode, getMergeSlotID() was not using the daemon
RPC to retrieve the configured issue_prefix, causing it to fall back
to the hardcoded "bd" default. This fix adds the missing daemon path
that uses daemonClient.GetConfig() to properly retrieve the prefix,
matching the pattern used in create.go.

Fixes #1096

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 03:44:02 -08:00

549 lines
16 KiB
Go

package main
import (
"encoding/json"
"fmt"
"os"
"strings"
"github.com/spf13/cobra"
"github.com/steveyegge/beads/internal/config"
"github.com/steveyegge/beads/internal/rpc"
"github.com/steveyegge/beads/internal/types"
"github.com/steveyegge/beads/internal/ui"
"github.com/steveyegge/beads/internal/utils"
)
// mergeSlotCmd is the parent command for merge-slot operations
var mergeSlotCmd = &cobra.Command{
Use: "merge-slot",
GroupID: "issues",
Short: "Manage merge-slot gates for serialized conflict resolution",
Long: `Merge-slot gates serialize conflict resolution in the merge queue.
A merge slot is an exclusive access primitive: only one agent can hold it at a time.
This prevents "monkey knife fights" where multiple polecats race to resolve conflicts
and create cascading conflicts.
Each rig has one merge slot bead: <prefix>-merge-slot (labeled gt:slot).
The slot uses:
- status=open: slot is available
- status=in_progress: slot is held
- holder field: who currently holds the slot
- waiters field: priority-ordered queue of waiters
Examples:
bd merge-slot create # Create merge slot for current rig
bd merge-slot check # Check if slot is available
bd merge-slot acquire # Try to acquire the slot
bd merge-slot release # Release the slot
bd merge-slot wait # Wait for slot to become available`,
}
// mergeSlotCreateCmd creates a merge slot bead for the current rig
var mergeSlotCreateCmd = &cobra.Command{
Use: "create",
Short: "Create a merge slot bead for the current rig",
Long: `Create a merge slot bead for serialized conflict resolution.
The slot ID is automatically generated based on the beads prefix (e.g., gt-merge-slot).
The slot is created with status=open (available).`,
Args: cobra.NoArgs,
RunE: runMergeSlotCreate,
}
// mergeSlotCheckCmd checks the current merge slot status
var mergeSlotCheckCmd = &cobra.Command{
Use: "check",
Short: "Check merge slot availability",
Long: `Check if the merge slot is available or held.
Returns:
- available: slot can be acquired
- held by <holder>: slot is currently held
- not found: no merge slot exists for this rig`,
Args: cobra.NoArgs,
RunE: runMergeSlotCheck,
}
// mergeSlotAcquireCmd attempts to acquire the merge slot
var mergeSlotAcquireCmd = &cobra.Command{
Use: "acquire",
Short: "Acquire the merge slot",
Long: `Attempt to acquire the merge slot for exclusive access.
If the slot is available (status=open), it will be acquired:
- status set to in_progress
- holder set to the requester
If the slot is held (status=in_progress), the command fails and the
requester is optionally added to the waiters list (use --wait flag).
Use --holder to specify who is acquiring (default: BD_ACTOR env var).`,
Args: cobra.NoArgs,
RunE: runMergeSlotAcquire,
}
// mergeSlotReleaseCmd releases the merge slot
var mergeSlotReleaseCmd = &cobra.Command{
Use: "release",
Short: "Release the merge slot",
Long: `Release the merge slot after conflict resolution is complete.
Sets status back to open and clears the holder field.
If there are waiters, the highest-priority waiter should then acquire.`,
Args: cobra.NoArgs,
RunE: runMergeSlotRelease,
}
var (
mergeSlotHolder string
mergeSlotAddWaiter bool
)
func init() {
mergeSlotAcquireCmd.Flags().StringVar(&mergeSlotHolder, "holder", "", "Who is acquiring the slot (default: BD_ACTOR)")
mergeSlotAcquireCmd.Flags().BoolVar(&mergeSlotAddWaiter, "wait", false, "Add to waiters list if slot is held")
mergeSlotReleaseCmd.Flags().StringVar(&mergeSlotHolder, "holder", "", "Who is releasing the slot (for verification)")
mergeSlotCmd.AddCommand(mergeSlotCreateCmd)
mergeSlotCmd.AddCommand(mergeSlotCheckCmd)
mergeSlotCmd.AddCommand(mergeSlotAcquireCmd)
mergeSlotCmd.AddCommand(mergeSlotReleaseCmd)
rootCmd.AddCommand(mergeSlotCmd)
}
// getMergeSlotID returns the merge slot bead ID for the current rig
func getMergeSlotID() string {
// Use the prefix from beads config (default "bd")
prefix := "bd"
// First try config.yaml (issue-prefix)
if configPrefix := config.GetString("issue-prefix"); configPrefix != "" {
prefix = strings.TrimSuffix(configPrefix, "-")
} else if daemonClient != nil {
// Daemon mode - use RPC to get config
if configResp, err := daemonClient.GetConfig(&rpc.GetConfigArgs{Key: "issue_prefix"}); err == nil && configResp.Value != "" {
prefix = strings.TrimSuffix(configResp.Value, "-")
}
} else if store != nil {
// Direct mode - check database config
if dbPrefix, err := store.GetConfig(rootCtx, "issue_prefix"); err == nil && dbPrefix != "" {
prefix = strings.TrimSuffix(dbPrefix, "-")
}
}
return prefix + "-merge-slot"
}
func runMergeSlotCreate(cmd *cobra.Command, args []string) error {
CheckReadonly("merge-slot create")
slotID := getMergeSlotID()
ctx := rootCtx
// Check if slot already exists
var existing *types.Issue
if daemonClient != nil {
resp, err := daemonClient.Show(&rpc.ShowArgs{ID: slotID})
if err == nil && resp.Success {
if uerr := json.Unmarshal(resp.Data, &existing); uerr == nil {
fmt.Printf("Merge slot already exists: %s\n", slotID)
return nil
}
}
} else {
existing, _ = store.GetIssue(ctx, slotID)
if existing != nil {
fmt.Printf("Merge slot already exists: %s\n", slotID)
return nil
}
}
// Create the merge slot bead
title := "Merge Slot"
description := "Exclusive access slot for serialized conflict resolution in the merge queue."
if daemonClient != nil {
createArgs := &rpc.CreateArgs{
ID: slotID,
Title: title,
Description: description,
IssueType: string(types.TypeTask), // Use task type; gt:slot label marks it as slot
Priority: 0, // P0 - system infrastructure
Labels: []string{"gt:slot"}, // Gas Town slot label
}
resp, err := daemonClient.Create(createArgs)
if err != nil {
return fmt.Errorf("failed to create merge slot: %w", err)
}
if !resp.Success {
return fmt.Errorf("failed to create merge slot: %s", resp.Error)
}
} else {
issue := &types.Issue{
ID: slotID,
Title: title,
Description: description,
IssueType: types.TypeTask, // Use task type; gt:slot label marks it as slot
Status: types.StatusOpen,
Priority: 0,
}
if err := store.CreateIssue(ctx, issue, actor); err != nil {
return fmt.Errorf("failed to create merge slot: %w", err)
}
// Add gt:slot label to mark as slot bead
if err := store.AddLabel(ctx, slotID, "gt:slot", actor); err != nil {
// Non-fatal: log warning but don't fail creation
fmt.Fprintf(os.Stderr, "warning: failed to add gt:slot label: %v\n", err)
}
markDirtyAndScheduleFlush()
}
if jsonOutput {
result := map[string]interface{}{
"id": slotID,
"status": "open",
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("%s Created merge slot: %s\n", ui.RenderPass("✓"), slotID)
return nil
}
func runMergeSlotCheck(cmd *cobra.Command, args []string) error {
slotID := getMergeSlotID()
ctx := rootCtx
// Get the slot bead
var slot *types.Issue
if daemonClient != nil {
resp, err := daemonClient.Show(&rpc.ShowArgs{ID: slotID})
if err != nil {
if jsonOutput {
result := map[string]interface{}{
"id": slotID,
"available": false,
"error": "not found",
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("Merge slot not found: %s\n", slotID)
fmt.Printf("Run 'bd merge-slot create' to create one.\n")
return nil
}
var details types.IssueDetails
if uerr := json.Unmarshal(resp.Data, &details); uerr != nil {
return fmt.Errorf("parsing response: %w", uerr)
}
slot = &details.Issue
} else {
var err error
slot, err = store.GetIssue(ctx, slotID)
if err != nil || slot == nil {
if jsonOutput {
result := map[string]interface{}{
"id": slotID,
"available": false,
"error": "not found",
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("Merge slot not found: %s\n", slotID)
fmt.Printf("Run 'bd merge-slot create' to create one.\n")
return nil
}
}
available := slot.Status == types.StatusOpen
holder := slot.Holder
waiters := slot.Waiters
if jsonOutput {
result := map[string]interface{}{
"id": slotID,
"available": available,
"status": string(slot.Status),
"holder": emptyToNil(holder),
"waiters": waiters,
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
if available {
fmt.Printf("%s Merge slot available: %s\n", ui.RenderPass("✓"), slotID)
} else {
fmt.Printf("%s Merge slot held: %s\n", ui.RenderAccent("○"), slotID)
fmt.Printf(" Holder: %s\n", holder)
if len(waiters) > 0 {
fmt.Printf(" Waiters: %d\n", len(waiters))
for i, w := range waiters {
fmt.Printf(" %d. %s\n", i+1, w)
}
}
}
return nil
}
func runMergeSlotAcquire(cmd *cobra.Command, args []string) error {
CheckReadonly("merge-slot acquire")
slotID := getMergeSlotID()
ctx := rootCtx
// Determine holder
holder := mergeSlotHolder
if holder == "" {
holder = actor
}
if holder == "" {
return fmt.Errorf("no holder specified; use --holder or set BD_ACTOR env var")
}
// Get the slot bead
var slot *types.Issue
if daemonClient != nil {
// Try to resolve the slot ID first
resp, err := daemonClient.ResolveID(&rpc.ResolveIDArgs{ID: slotID})
if err != nil {
return fmt.Errorf("merge slot not found: %s (run 'bd merge-slot create' first)", slotID)
}
var resolvedID string
if uerr := json.Unmarshal(resp.Data, &resolvedID); uerr != nil {
return fmt.Errorf("parsing response: %w", uerr)
}
showResp, showErr := daemonClient.Show(&rpc.ShowArgs{ID: resolvedID})
if showErr != nil {
return fmt.Errorf("merge slot not found: %s", slotID)
}
var details types.IssueDetails
if uerr := json.Unmarshal(showResp.Data, &details); uerr != nil {
return fmt.Errorf("parsing response: %w", uerr)
}
slot = &details.Issue
} else {
var err error
resolvedID, err := utils.ResolvePartialID(ctx, store, slotID)
if err != nil {
return fmt.Errorf("merge slot not found: %s (run 'bd merge-slot create' first)", slotID)
}
slot, err = store.GetIssue(ctx, resolvedID)
if err != nil || slot == nil {
return fmt.Errorf("merge slot not found: %s", slotID)
}
}
// Check slot availability
if slot.Status != types.StatusOpen {
// Slot is held
if mergeSlotAddWaiter {
// Add to waiters list
alreadyWaiting := false
for _, w := range slot.Waiters {
if w == holder {
alreadyWaiting = true
break
}
}
if !alreadyWaiting {
newWaiters := append(slot.Waiters, holder)
if daemonClient != nil {
updateArgs := &rpc.UpdateArgs{
ID: slot.ID,
Waiters: newWaiters,
}
_, err := daemonClient.Update(updateArgs)
if err != nil {
return fmt.Errorf("failed to add waiter: %w", err)
}
} else {
updates := map[string]interface{}{
"waiters": newWaiters,
}
if err := store.UpdateIssue(ctx, slot.ID, updates, actor); err != nil {
return fmt.Errorf("failed to add waiter: %w", err)
}
markDirtyAndScheduleFlush()
}
}
if jsonOutput {
result := map[string]interface{}{
"id": slot.ID,
"acquired": false,
"waiting": true,
"holder": slot.Holder,
"position": len(slot.Waiters) + 1,
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("%s Slot held by %s, added to waiters queue (position %d)\n",
ui.RenderAccent("○"), slot.Holder, len(slot.Waiters)+1)
os.Exit(1) // Exit with error to indicate slot not acquired
}
if jsonOutput {
result := map[string]interface{}{
"id": slot.ID,
"acquired": false,
"holder": slot.Holder,
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("%s Slot held by: %s\n", ui.RenderFail("✗"), slot.Holder)
fmt.Printf("Use --wait to add yourself to the waiters queue.\n")
os.Exit(1) // Exit with error to indicate slot not acquired
}
// Slot is available - acquire it
inProgressStatus := string(types.StatusInProgress)
if daemonClient != nil {
updateArgs := &rpc.UpdateArgs{
ID: slot.ID,
Status: &inProgressStatus,
Holder: &holder,
}
_, err := daemonClient.Update(updateArgs)
if err != nil {
return fmt.Errorf("failed to acquire slot: %w", err)
}
} else {
updates := map[string]interface{}{
"status": types.StatusInProgress,
"holder": holder,
}
if err := store.UpdateIssue(ctx, slot.ID, updates, actor); err != nil {
return fmt.Errorf("failed to acquire slot: %w", err)
}
markDirtyAndScheduleFlush()
}
if jsonOutput {
result := map[string]interface{}{
"id": slot.ID,
"acquired": true,
"holder": holder,
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("%s Acquired merge slot: %s\n", ui.RenderPass("✓"), slot.ID)
fmt.Printf(" Holder: %s\n", holder)
return nil
}
func runMergeSlotRelease(cmd *cobra.Command, args []string) error {
CheckReadonly("merge-slot release")
slotID := getMergeSlotID()
ctx := rootCtx
// Get the slot bead
var slot *types.Issue
if daemonClient != nil {
resp, err := daemonClient.Show(&rpc.ShowArgs{ID: slotID})
if err != nil {
return fmt.Errorf("merge slot not found: %s", slotID)
}
var details types.IssueDetails
if uerr := json.Unmarshal(resp.Data, &details); uerr != nil {
return fmt.Errorf("parsing response: %w", uerr)
}
slot = &details.Issue
} else {
var err error
slot, err = store.GetIssue(ctx, slotID)
if err != nil || slot == nil {
return fmt.Errorf("merge slot not found: %s", slotID)
}
}
// Verify holder if specified
if mergeSlotHolder != "" && slot.Holder != mergeSlotHolder {
return fmt.Errorf("slot held by %s, not %s", slot.Holder, mergeSlotHolder)
}
// Check if slot is actually held
if slot.Status == types.StatusOpen {
if jsonOutput {
result := map[string]interface{}{
"id": slot.ID,
"released": false,
"error": "slot not held",
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("Slot is not held: %s\n", slot.ID)
return nil
}
previousHolder := slot.Holder
waiters := slot.Waiters
// Release the slot
openStatus := string(types.StatusOpen)
emptyHolder := ""
if daemonClient != nil {
updateArgs := &rpc.UpdateArgs{
ID: slot.ID,
Status: &openStatus,
Holder: &emptyHolder,
}
_, err := daemonClient.Update(updateArgs)
if err != nil {
return fmt.Errorf("failed to release slot: %w", err)
}
} else {
updates := map[string]interface{}{
"status": types.StatusOpen,
"holder": "",
}
if err := store.UpdateIssue(ctx, slot.ID, updates, actor); err != nil {
return fmt.Errorf("failed to release slot: %w", err)
}
markDirtyAndScheduleFlush()
}
if jsonOutput {
result := map[string]interface{}{
"id": slot.ID,
"released": true,
"previous_holder": previousHolder,
"waiters": len(waiters),
}
encoder := json.NewEncoder(os.Stdout)
encoder.SetIndent("", " ")
return encoder.Encode(result)
}
fmt.Printf("%s Released merge slot: %s\n", ui.RenderPass("✓"), slot.ID)
fmt.Printf(" Previous holder: %s\n", previousHolder)
if len(waiters) > 0 {
fmt.Printf(" Waiters pending: %d\n", len(waiters))
fmt.Printf(" Next in queue: %s\n", waiters[0])
}
return nil
}