diff --git a/cmd/bd/merge_slot.go b/cmd/bd/merge_slot.go new file mode 100644 index 00000000..df99eda3 --- /dev/null +++ b/cmd/bd/merge_slot.go @@ -0,0 +1,538 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" + "strings" + + "github.com/spf13/cobra" + "github.com/steveyegge/beads/internal/config" + "github.com/steveyegge/beads/internal/rpc" + "github.com/steveyegge/beads/internal/types" + "github.com/steveyegge/beads/internal/ui" + "github.com/steveyegge/beads/internal/utils" +) + +// mergeSlotCmd is the parent command for merge-slot operations +var mergeSlotCmd = &cobra.Command{ + Use: "merge-slot", + GroupID: "issues", + Short: "Manage merge-slot gates for serialized conflict resolution", + Long: `Merge-slot gates serialize conflict resolution in the merge queue. + +A merge slot is an exclusive access primitive: only one agent can hold it at a time. +This prevents "monkey knife fights" where multiple polecats race to resolve conflicts +and create cascading conflicts. + +Each rig has one merge slot bead: -merge-slot (type=slot). +The slot uses: + - status=open: slot is available + - status=in_progress: slot is held + - holder field: who currently holds the slot + - waiters field: priority-ordered queue of waiters + +Examples: + bd merge-slot create # Create merge slot for current rig + bd merge-slot check # Check if slot is available + bd merge-slot acquire # Try to acquire the slot + bd merge-slot release # Release the slot + bd merge-slot wait # Wait for slot to become available`, +} + +// mergeSlotCreateCmd creates a merge slot bead for the current rig +var mergeSlotCreateCmd = &cobra.Command{ + Use: "create", + Short: "Create a merge slot bead for the current rig", + Long: `Create a merge slot bead for serialized conflict resolution. + +The slot ID is automatically generated based on the beads prefix (e.g., gt-merge-slot). +The slot is created with status=open (available).`, + Args: cobra.NoArgs, + RunE: runMergeSlotCreate, +} + +// mergeSlotCheckCmd checks the current merge slot status +var mergeSlotCheckCmd = &cobra.Command{ + Use: "check", + Short: "Check merge slot availability", + Long: `Check if the merge slot is available or held. + +Returns: + - available: slot can be acquired + - held by : slot is currently held + - not found: no merge slot exists for this rig`, + Args: cobra.NoArgs, + RunE: runMergeSlotCheck, +} + +// mergeSlotAcquireCmd attempts to acquire the merge slot +var mergeSlotAcquireCmd = &cobra.Command{ + Use: "acquire", + Short: "Acquire the merge slot", + Long: `Attempt to acquire the merge slot for exclusive access. + +If the slot is available (status=open), it will be acquired: + - status set to in_progress + - holder set to the requester + +If the slot is held (status=in_progress), the command fails and the +requester is optionally added to the waiters list (use --wait flag). + +Use --holder to specify who is acquiring (default: BD_ACTOR env var).`, + Args: cobra.NoArgs, + RunE: runMergeSlotAcquire, +} + +// mergeSlotReleaseCmd releases the merge slot +var mergeSlotReleaseCmd = &cobra.Command{ + Use: "release", + Short: "Release the merge slot", + Long: `Release the merge slot after conflict resolution is complete. + +Sets status back to open and clears the holder field. +If there are waiters, the highest-priority waiter should then acquire.`, + Args: cobra.NoArgs, + RunE: runMergeSlotRelease, +} + +var ( + mergeSlotHolder string + mergeSlotAddWaiter bool +) + +func init() { + mergeSlotAcquireCmd.Flags().StringVar(&mergeSlotHolder, "holder", "", "Who is acquiring the slot (default: BD_ACTOR)") + mergeSlotAcquireCmd.Flags().BoolVar(&mergeSlotAddWaiter, "wait", false, "Add to waiters list if slot is held") + mergeSlotReleaseCmd.Flags().StringVar(&mergeSlotHolder, "holder", "", "Who is releasing the slot (for verification)") + + mergeSlotCmd.AddCommand(mergeSlotCreateCmd) + mergeSlotCmd.AddCommand(mergeSlotCheckCmd) + mergeSlotCmd.AddCommand(mergeSlotAcquireCmd) + mergeSlotCmd.AddCommand(mergeSlotReleaseCmd) + rootCmd.AddCommand(mergeSlotCmd) +} + +// getMergeSlotID returns the merge slot bead ID for the current rig +func getMergeSlotID() string { + // Use the prefix from beads config (default "bd") + prefix := "bd" + + // First try config.yaml (issue-prefix) + if configPrefix := config.GetString("issue-prefix"); configPrefix != "" { + prefix = strings.TrimSuffix(configPrefix, "-") + } else if store != nil { + // Fall back to database config + if dbPrefix, err := store.GetConfig(rootCtx, "issue_prefix"); err == nil && dbPrefix != "" { + prefix = strings.TrimSuffix(dbPrefix, "-") + } + } + return prefix + "-merge-slot" +} + +func runMergeSlotCreate(cmd *cobra.Command, args []string) error { + CheckReadonly("merge-slot create") + + slotID := getMergeSlotID() + ctx := rootCtx + + // Check if slot already exists + var existing *types.Issue + if daemonClient != nil { + resp, err := daemonClient.Show(&rpc.ShowArgs{ID: slotID}) + if err == nil && resp.Success { + if uerr := json.Unmarshal(resp.Data, &existing); uerr == nil { + fmt.Printf("Merge slot already exists: %s\n", slotID) + return nil + } + } + } else { + existing, _ = store.GetIssue(ctx, slotID) + if existing != nil { + fmt.Printf("Merge slot already exists: %s\n", slotID) + return nil + } + } + + // Create the merge slot bead + title := "Merge Slot" + description := "Exclusive access slot for serialized conflict resolution in the merge queue." + slotType := types.TypeSlot + + if daemonClient != nil { + createArgs := &rpc.CreateArgs{ + ID: slotID, + Title: title, + Description: description, + IssueType: string(slotType), + Priority: 0, // P0 - system infrastructure + } + resp, err := daemonClient.Create(createArgs) + if err != nil { + return fmt.Errorf("failed to create merge slot: %w", err) + } + if !resp.Success { + return fmt.Errorf("failed to create merge slot: %s", resp.Error) + } + } else { + issue := &types.Issue{ + ID: slotID, + Title: title, + Description: description, + IssueType: slotType, + Status: types.StatusOpen, + Priority: 0, + } + if err := store.CreateIssue(ctx, issue, actor); err != nil { + return fmt.Errorf("failed to create merge slot: %w", err) + } + markDirtyAndScheduleFlush() + } + + if jsonOutput { + result := map[string]interface{}{ + "id": slotID, + "status": "open", + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + + fmt.Printf("%s Created merge slot: %s\n", ui.RenderPass("✓"), slotID) + return nil +} + +func runMergeSlotCheck(cmd *cobra.Command, args []string) error { + slotID := getMergeSlotID() + ctx := rootCtx + + // Get the slot bead + var slot *types.Issue + if daemonClient != nil { + resp, err := daemonClient.Show(&rpc.ShowArgs{ID: slotID}) + if err != nil { + if jsonOutput { + result := map[string]interface{}{ + "id": slotID, + "available": false, + "error": "not found", + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + fmt.Printf("Merge slot not found: %s\n", slotID) + fmt.Printf("Run 'bd merge-slot create' to create one.\n") + return nil + } + var details types.IssueDetails + if uerr := json.Unmarshal(resp.Data, &details); uerr != nil { + return fmt.Errorf("parsing response: %w", uerr) + } + slot = &details.Issue + } else { + var err error + slot, err = store.GetIssue(ctx, slotID) + if err != nil || slot == nil { + if jsonOutput { + result := map[string]interface{}{ + "id": slotID, + "available": false, + "error": "not found", + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + fmt.Printf("Merge slot not found: %s\n", slotID) + fmt.Printf("Run 'bd merge-slot create' to create one.\n") + return nil + } + } + + available := slot.Status == types.StatusOpen + holder := slot.Holder + waiters := slot.Waiters + + if jsonOutput { + result := map[string]interface{}{ + "id": slotID, + "available": available, + "status": string(slot.Status), + "holder": emptyToNil(holder), + "waiters": waiters, + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + + if available { + fmt.Printf("%s Merge slot available: %s\n", ui.RenderPass("✓"), slotID) + } else { + fmt.Printf("%s Merge slot held: %s\n", ui.RenderAccent("○"), slotID) + fmt.Printf(" Holder: %s\n", holder) + if len(waiters) > 0 { + fmt.Printf(" Waiters: %d\n", len(waiters)) + for i, w := range waiters { + fmt.Printf(" %d. %s\n", i+1, w) + } + } + } + + return nil +} + +func runMergeSlotAcquire(cmd *cobra.Command, args []string) error { + CheckReadonly("merge-slot acquire") + + slotID := getMergeSlotID() + ctx := rootCtx + + // Determine holder + holder := mergeSlotHolder + if holder == "" { + holder = actor + } + if holder == "" { + return fmt.Errorf("no holder specified; use --holder or set BD_ACTOR env var") + } + + // Get the slot bead + var slot *types.Issue + if daemonClient != nil { + // Try to resolve the slot ID first + resp, err := daemonClient.ResolveID(&rpc.ResolveIDArgs{ID: slotID}) + if err != nil { + return fmt.Errorf("merge slot not found: %s (run 'bd merge-slot create' first)", slotID) + } + var resolvedID string + if uerr := json.Unmarshal(resp.Data, &resolvedID); uerr != nil { + return fmt.Errorf("parsing response: %w", uerr) + } + + showResp, showErr := daemonClient.Show(&rpc.ShowArgs{ID: resolvedID}) + if showErr != nil { + return fmt.Errorf("merge slot not found: %s", slotID) + } + var details types.IssueDetails + if uerr := json.Unmarshal(showResp.Data, &details); uerr != nil { + return fmt.Errorf("parsing response: %w", uerr) + } + slot = &details.Issue + } else { + var err error + resolvedID, err := utils.ResolvePartialID(ctx, store, slotID) + if err != nil { + return fmt.Errorf("merge slot not found: %s (run 'bd merge-slot create' first)", slotID) + } + slot, err = store.GetIssue(ctx, resolvedID) + if err != nil || slot == nil { + return fmt.Errorf("merge slot not found: %s", slotID) + } + } + + // Check slot availability + if slot.Status != types.StatusOpen { + // Slot is held + if mergeSlotAddWaiter { + // Add to waiters list + alreadyWaiting := false + for _, w := range slot.Waiters { + if w == holder { + alreadyWaiting = true + break + } + } + + if !alreadyWaiting { + newWaiters := append(slot.Waiters, holder) + if daemonClient != nil { + updateArgs := &rpc.UpdateArgs{ + ID: slot.ID, + Waiters: newWaiters, + } + _, err := daemonClient.Update(updateArgs) + if err != nil { + return fmt.Errorf("failed to add waiter: %w", err) + } + } else { + updates := map[string]interface{}{ + "waiters": newWaiters, + } + if err := store.UpdateIssue(ctx, slot.ID, updates, actor); err != nil { + return fmt.Errorf("failed to add waiter: %w", err) + } + markDirtyAndScheduleFlush() + } + } + + if jsonOutput { + result := map[string]interface{}{ + "id": slot.ID, + "acquired": false, + "waiting": true, + "holder": slot.Holder, + "position": len(slot.Waiters) + 1, + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + + fmt.Printf("%s Slot held by %s, added to waiters queue (position %d)\n", + ui.RenderAccent("○"), slot.Holder, len(slot.Waiters)+1) + os.Exit(1) // Exit with error to indicate slot not acquired + } + + if jsonOutput { + result := map[string]interface{}{ + "id": slot.ID, + "acquired": false, + "holder": slot.Holder, + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + + fmt.Printf("%s Slot held by: %s\n", ui.RenderFail("✗"), slot.Holder) + fmt.Printf("Use --wait to add yourself to the waiters queue.\n") + os.Exit(1) // Exit with error to indicate slot not acquired + } + + // Slot is available - acquire it + inProgressStatus := string(types.StatusInProgress) + if daemonClient != nil { + updateArgs := &rpc.UpdateArgs{ + ID: slot.ID, + Status: &inProgressStatus, + Holder: &holder, + } + _, err := daemonClient.Update(updateArgs) + if err != nil { + return fmt.Errorf("failed to acquire slot: %w", err) + } + } else { + updates := map[string]interface{}{ + "status": types.StatusInProgress, + "holder": holder, + } + if err := store.UpdateIssue(ctx, slot.ID, updates, actor); err != nil { + return fmt.Errorf("failed to acquire slot: %w", err) + } + markDirtyAndScheduleFlush() + } + + if jsonOutput { + result := map[string]interface{}{ + "id": slot.ID, + "acquired": true, + "holder": holder, + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + + fmt.Printf("%s Acquired merge slot: %s\n", ui.RenderPass("✓"), slot.ID) + fmt.Printf(" Holder: %s\n", holder) + return nil +} + +func runMergeSlotRelease(cmd *cobra.Command, args []string) error { + CheckReadonly("merge-slot release") + + slotID := getMergeSlotID() + ctx := rootCtx + + // Get the slot bead + var slot *types.Issue + if daemonClient != nil { + resp, err := daemonClient.Show(&rpc.ShowArgs{ID: slotID}) + if err != nil { + return fmt.Errorf("merge slot not found: %s", slotID) + } + var details types.IssueDetails + if uerr := json.Unmarshal(resp.Data, &details); uerr != nil { + return fmt.Errorf("parsing response: %w", uerr) + } + slot = &details.Issue + } else { + var err error + slot, err = store.GetIssue(ctx, slotID) + if err != nil || slot == nil { + return fmt.Errorf("merge slot not found: %s", slotID) + } + } + + // Verify holder if specified + if mergeSlotHolder != "" && slot.Holder != mergeSlotHolder { + return fmt.Errorf("slot held by %s, not %s", slot.Holder, mergeSlotHolder) + } + + // Check if slot is actually held + if slot.Status == types.StatusOpen { + if jsonOutput { + result := map[string]interface{}{ + "id": slot.ID, + "released": false, + "error": "slot not held", + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + fmt.Printf("Slot is not held: %s\n", slot.ID) + return nil + } + + previousHolder := slot.Holder + waiters := slot.Waiters + + // Release the slot + openStatus := string(types.StatusOpen) + emptyHolder := "" + if daemonClient != nil { + updateArgs := &rpc.UpdateArgs{ + ID: slot.ID, + Status: &openStatus, + Holder: &emptyHolder, + } + _, err := daemonClient.Update(updateArgs) + if err != nil { + return fmt.Errorf("failed to release slot: %w", err) + } + } else { + updates := map[string]interface{}{ + "status": types.StatusOpen, + "holder": "", + } + if err := store.UpdateIssue(ctx, slot.ID, updates, actor); err != nil { + return fmt.Errorf("failed to release slot: %w", err) + } + markDirtyAndScheduleFlush() + } + + if jsonOutput { + result := map[string]interface{}{ + "id": slot.ID, + "released": true, + "previous_holder": previousHolder, + "waiters": len(waiters), + } + encoder := json.NewEncoder(os.Stdout) + encoder.SetIndent("", " ") + return encoder.Encode(result) + } + + fmt.Printf("%s Released merge slot: %s\n", ui.RenderPass("✓"), slot.ID) + fmt.Printf(" Previous holder: %s\n", previousHolder) + if len(waiters) > 0 { + fmt.Printf(" Waiters pending: %d\n", len(waiters)) + fmt.Printf(" Next in queue: %s\n", waiters[0]) + } + + return nil +} diff --git a/internal/rpc/protocol.go b/internal/rpc/protocol.go index c5b5d650..347f15ef 100644 --- a/internal/rpc/protocol.go +++ b/internal/rpc/protocol.go @@ -164,6 +164,8 @@ type UpdateArgs struct { // Gate fields AwaitID *string `json:"await_id,omitempty"` // Condition identifier for gates (run ID, PR number, etc.) Waiters []string `json:"waiters,omitempty"` // Mail addresses to notify when gate clears + // Slot fields + Holder *string `json:"holder,omitempty"` // Who currently holds the slot (for type=slot beads) } // CloseArgs represents arguments for the close operation diff --git a/internal/rpc/server_issues_epics.go b/internal/rpc/server_issues_epics.go index e787f0d0..761dbee5 100644 --- a/internal/rpc/server_issues_epics.go +++ b/internal/rpc/server_issues_epics.go @@ -142,6 +142,10 @@ func updatesFromArgs(a UpdateArgs) map[string]interface{} { if len(a.Waiters) > 0 { u["waiters"] = a.Waiters } + // Slot fields + if a.Holder != nil { + u["holder"] = *a.Holder + } return u } diff --git a/internal/types/types.go b/internal/types/types.go index 7e405e4b..9e28a124 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -91,6 +91,9 @@ type Issue struct { Timeout time.Duration `json:"timeout,omitempty"` // Max wait time before escalation Waiters []string `json:"waiters,omitempty"` // Mail addresses to notify when gate clears + // ===== Slot Fields (exclusive access primitives) ===== + Holder string `json:"holder,omitempty"` // Who currently holds the slot (empty = available) + // ===== Source Tracing Fields (formula cooking origin) ===== SourceFormula string `json:"source_formula,omitempty"` // Formula name where step was defined SourceLocation string `json:"source_location,omitempty"` // Path: "steps[0]", "advice[0].after" @@ -163,6 +166,9 @@ func (i *Issue) ComputeContentHash() string { w.str(waiter) } + // Slot fields for exclusive access + w.str(i.Holder) + // Agent identity fields w.str(i.HookBead) w.str(i.RoleBead) @@ -413,12 +419,13 @@ const ( TypeRole IssueType = "role" // Agent role definition TypeConvoy IssueType = "convoy" // Cross-project tracking with reactive completion TypeEvent IssueType = "event" // Operational state change record + TypeSlot IssueType = "slot" // Exclusive access slot (merge-slot gate) ) // IsValid checks if the issue type value is valid func (t IssueType) IsValid() bool { switch t { - case TypeBug, TypeFeature, TypeTask, TypeEpic, TypeChore, TypeMessage, TypeMergeRequest, TypeMolecule, TypeGate, TypeAgent, TypeRole, TypeConvoy, TypeEvent: + case TypeBug, TypeFeature, TypeTask, TypeEpic, TypeChore, TypeMessage, TypeMergeRequest, TypeMolecule, TypeGate, TypeAgent, TypeRole, TypeConvoy, TypeEvent, TypeSlot: return true } return false