From 9cc385debc439958116fafc18169027cdfbeccb1 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Tue, 30 Dec 2025 10:41:51 -0800 Subject: [PATCH] feat: Add --claim flag to bd update for work queue semantics (gt-il2p7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds atomic claim operation for work queue messages: - New --claim flag on bd update command - Sets assignee to claimer and status to in_progress - Fails with clear error if already claimed by someone else - Works in both daemon and direct modes - Includes comprehensive tests for claim functionality 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- cmd/bd/update.go | 27 +++- internal/rpc/protocol.go | 2 + internal/rpc/server_issues_epics.go | 25 +++- internal/rpc/server_mutations_test.go | 201 ++++++++++++++++++++++++++ 4 files changed, 253 insertions(+), 2 deletions(-) diff --git a/cmd/bd/update.go b/cmd/bd/update.go index 9929c4da..45b04e19 100644 --- a/cmd/bd/update.go +++ b/cmd/bd/update.go @@ -122,7 +122,10 @@ create, update, show, or close operation).`, updates["issue_type"] = issueType } - if len(updates) == 0 { + // Get claim flag + claimFlag, _ := cmd.Flags().GetBool("claim") + + if len(updates) == 0 && !claimFlag { fmt.Println("No updates specified") return } @@ -209,6 +212,9 @@ create, update, show, or close operation).`, updateArgs.Parent = &parent } + // Set claim flag for atomic claim operation + updateArgs.Claim = claimFlag + resp, err := daemonClient.Update(updateArgs) if err != nil { fmt.Fprintf(os.Stderr, "Error updating %s: %v\n", id, err) @@ -261,6 +267,24 @@ create, update, show, or close operation).`, continue } + // Handle claim operation atomically + if claimFlag { + // Check if already claimed (has non-empty assignee) + if issue.Assignee != "" { + fmt.Fprintf(os.Stderr, "Error claiming %s: already claimed by %s\n", id, issue.Assignee) + continue + } + // Atomically set assignee and status + claimUpdates := map[string]interface{}{ + "assignee": actor, + "status": "in_progress", + } + if err := store.UpdateIssue(ctx, id, claimUpdates, actor); err != nil { + fmt.Fprintf(os.Stderr, "Error claiming %s: %v\n", id, err) + continue + } + } + // Apply regular field updates if any regularUpdates := make(map[string]interface{}) for k, v := range updates { @@ -387,5 +411,6 @@ func init() { updateCmd.Flags().StringSlice("remove-label", nil, "Remove labels (repeatable)") updateCmd.Flags().StringSlice("set-labels", nil, "Set labels, replacing all existing (repeatable)") updateCmd.Flags().String("parent", "", "New parent issue ID (reparents the issue, use empty string to remove parent)") + updateCmd.Flags().Bool("claim", false, "Atomically claim the issue (sets assignee to you, status to in_progress; fails if already claimed)") rootCmd.AddCommand(updateCmd) } diff --git a/internal/rpc/protocol.go b/internal/rpc/protocol.go index 224738af..53719692 100644 --- a/internal/rpc/protocol.go +++ b/internal/rpc/protocol.go @@ -152,6 +152,8 @@ type UpdateArgs struct { EventActor *string `json:"event_actor,omitempty"` // Entity URI who caused this event EventTarget *string `json:"event_target,omitempty"` // Entity URI or bead ID affected EventPayload *string `json:"event_payload,omitempty"` // Event-specific JSON data + // Work queue claim operation + Claim bool `json:"claim,omitempty"` // If true, atomically claim issue (set assignee+status, fail if already claimed) } // CloseArgs represents arguments for the close operation diff --git a/internal/rpc/server_issues_epics.go b/internal/rpc/server_issues_epics.go index 8dbc3cf5..78f8325b 100644 --- a/internal/rpc/server_issues_epics.go +++ b/internal/rpc/server_issues_epics.go @@ -468,9 +468,32 @@ func (s *Server) handleUpdate(req *Request) Response { } } - updates := updatesFromArgs(updateArgs) actor := s.reqActor(req) + // Handle claim operation atomically + if updateArgs.Claim { + // Check if already claimed (has non-empty assignee) + if issue.Assignee != "" { + return Response{ + Success: false, + Error: fmt.Sprintf("already claimed by %s", issue.Assignee), + } + } + // Atomically set assignee and status + claimUpdates := map[string]interface{}{ + "assignee": actor, + "status": "in_progress", + } + if err := store.UpdateIssue(ctx, updateArgs.ID, claimUpdates, actor); err != nil { + return Response{ + Success: false, + Error: fmt.Sprintf("failed to claim issue: %v", err), + } + } + } + + updates := updatesFromArgs(updateArgs) + // Apply regular field updates if any if len(updates) > 0 { if err := store.UpdateIssue(ctx, updateArgs.ID, updates, actor); err != nil { diff --git a/internal/rpc/server_mutations_test.go b/internal/rpc/server_mutations_test.go index 61631389..b438230c 100644 --- a/internal/rpc/server_mutations_test.go +++ b/internal/rpc/server_mutations_test.go @@ -1053,3 +1053,204 @@ func TestHandleDelete_CascadeAndForceFlags(t *testing.T) { t.Errorf("expected deleted_count=1, got %v", result["deleted_count"]) } } + +// TestHandleUpdate_ClaimFlag verifies atomic claim operation (gt-il2p7) +func TestHandleUpdate_ClaimFlag(t *testing.T) { + store := memory.New("/tmp/test.jsonl") + server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db") + + // Create an issue first + createArgs := CreateArgs{ + Title: "Test Issue for Claim", + IssueType: "task", + Priority: 2, + } + createJSON, _ := json.Marshal(createArgs) + createReq := &Request{ + Operation: OpCreate, + Args: createJSON, + Actor: "test-user", + } + + createResp := server.handleCreate(createReq) + if !createResp.Success { + t.Fatalf("failed to create test issue: %s", createResp.Error) + } + + var createdIssue types.Issue + if err := json.Unmarshal(createResp.Data, &createdIssue); err != nil { + t.Fatalf("failed to parse created issue: %v", err) + } + issueID := createdIssue.ID + + // Verify issue starts with no assignee + if createdIssue.Assignee != "" { + t.Fatalf("expected no assignee initially, got %s", createdIssue.Assignee) + } + + // Claim the issue + updateArgs := UpdateArgs{ + ID: issueID, + Claim: true, + } + updateJSON, _ := json.Marshal(updateArgs) + updateReq := &Request{ + Operation: OpUpdate, + Args: updateJSON, + Actor: "claiming-agent", + } + + updateResp := server.handleUpdate(updateReq) + if !updateResp.Success { + t.Fatalf("claim operation failed: %s", updateResp.Error) + } + + // Verify issue was claimed + var updatedIssue types.Issue + if err := json.Unmarshal(updateResp.Data, &updatedIssue); err != nil { + t.Fatalf("failed to parse updated issue: %v", err) + } + + if updatedIssue.Assignee != "claiming-agent" { + t.Errorf("expected assignee 'claiming-agent', got %s", updatedIssue.Assignee) + } + if updatedIssue.Status != "in_progress" { + t.Errorf("expected status 'in_progress', got %s", updatedIssue.Status) + } +} + +// TestHandleUpdate_ClaimFlag_AlreadyClaimed verifies double-claim returns error +func TestHandleUpdate_ClaimFlag_AlreadyClaimed(t *testing.T) { + store := memory.New("/tmp/test.jsonl") + server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db") + + // Create an issue first + createArgs := CreateArgs{ + Title: "Test Issue for Double Claim", + IssueType: "task", + Priority: 2, + } + createJSON, _ := json.Marshal(createArgs) + createReq := &Request{ + Operation: OpCreate, + Args: createJSON, + Actor: "test-user", + } + + createResp := server.handleCreate(createReq) + if !createResp.Success { + t.Fatalf("failed to create test issue: %s", createResp.Error) + } + + var createdIssue types.Issue + if err := json.Unmarshal(createResp.Data, &createdIssue); err != nil { + t.Fatalf("failed to parse created issue: %v", err) + } + issueID := createdIssue.ID + + // First claim should succeed + updateArgs := UpdateArgs{ + ID: issueID, + Claim: true, + } + updateJSON, _ := json.Marshal(updateArgs) + updateReq := &Request{ + Operation: OpUpdate, + Args: updateJSON, + Actor: "first-claimer", + } + + updateResp := server.handleUpdate(updateReq) + if !updateResp.Success { + t.Fatalf("first claim should succeed: %s", updateResp.Error) + } + + // Second claim should fail + updateArgs2 := UpdateArgs{ + ID: issueID, + Claim: true, + } + updateJSON2, _ := json.Marshal(updateArgs2) + updateReq2 := &Request{ + Operation: OpUpdate, + Args: updateJSON2, + Actor: "second-claimer", + } + + updateResp2 := server.handleUpdate(updateReq2) + if updateResp2.Success { + t.Error("expected second claim to fail, but it succeeded") + } + + // Verify error message + expectedError := "already claimed by first-claimer" + if updateResp2.Error != expectedError { + t.Errorf("expected error %q, got %q", expectedError, updateResp2.Error) + } +} + +// TestHandleUpdate_ClaimFlag_WithOtherUpdates verifies claim can combine with other updates +func TestHandleUpdate_ClaimFlag_WithOtherUpdates(t *testing.T) { + store := memory.New("/tmp/test.jsonl") + server := NewServer("/tmp/test.sock", store, "/tmp", "/tmp/test.db") + + // Create an issue first + createArgs := CreateArgs{ + Title: "Test Issue for Claim with Updates", + IssueType: "task", + Priority: 2, + } + createJSON, _ := json.Marshal(createArgs) + createReq := &Request{ + Operation: OpCreate, + Args: createJSON, + Actor: "test-user", + } + + createResp := server.handleCreate(createReq) + if !createResp.Success { + t.Fatalf("failed to create test issue: %s", createResp.Error) + } + + var createdIssue types.Issue + if err := json.Unmarshal(createResp.Data, &createdIssue); err != nil { + t.Fatalf("failed to parse created issue: %v", err) + } + issueID := createdIssue.ID + + // Claim and update priority at the same time + priority := 0 // High priority + updateArgs := UpdateArgs{ + ID: issueID, + Claim: true, + Priority: &priority, + } + updateJSON, _ := json.Marshal(updateArgs) + updateReq := &Request{ + Operation: OpUpdate, + Args: updateJSON, + Actor: "claiming-agent", + } + + updateResp := server.handleUpdate(updateReq) + if !updateResp.Success { + t.Fatalf("claim with updates failed: %s", updateResp.Error) + } + + // Verify all updates were applied + ctx := context.Background() + issue, err := store.GetIssue(ctx, issueID) + if err != nil { + t.Fatalf("failed to get issue: %v", err) + } + + if issue.Assignee != "claiming-agent" { + t.Errorf("expected assignee 'claiming-agent', got %s", issue.Assignee) + } + if issue.Status != "in_progress" { + t.Errorf("expected status 'in_progress', got %s", issue.Status) + } + if issue.Priority != 0 { + t.Errorf("expected priority 0, got %d", issue.Priority) + } +}