feat: Add Beads MCP Server [bd-5]

Implements MCP server for beads issue tracker, exposing all bd CLI functionality to MCP clients like Claude Desktop.

Features:
- Complete bd command coverage (init, create, list, ready, show, update, close, dep, blocked, stats)
- Type-safe Pydantic models with validation
- Comprehensive test suite (unit + integration tests)
- Production-ready Python package structure
- Environment variable configuration support
- Quickstart resource (beads://quickstart)

Ready for PyPI publication after real-world testing.

Co-authored-by: ghoseb <baishampayan.ghose@gmail.com>
This commit is contained in:
Baishampayan Ghose
2025-10-14 23:43:52 +05:30
committed by GitHub
parent 69cff96d9d
commit 1b1380e6c3
17 changed files with 4733 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Tests for beads-mcp."""

View File

@@ -0,0 +1,612 @@
"""Unit tests for BdClient."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from beads_mcp.bd_client import BdClient, BdCommandError, BdNotFoundError
from beads_mcp.models import (
AddDependencyParams,
CloseIssueParams,
CreateIssueParams,
DependencyType,
IssueStatus,
IssueType,
ListIssuesParams,
ReadyWorkParams,
ShowIssueParams,
UpdateIssueParams,
)
@pytest.fixture
def bd_client():
"""Create a BdClient instance for testing."""
return BdClient(bd_path="/usr/bin/bd", beads_db="/tmp/test.db")
@pytest.fixture
def mock_process():
"""Create a mock subprocess process."""
process = MagicMock()
process.returncode = 0
process.communicate = AsyncMock(return_value=(b"", b""))
return process
@pytest.mark.asyncio
async def test_bd_client_initialization():
"""Test BdClient initialization."""
client = BdClient(bd_path="/usr/bin/bd", beads_db="/tmp/test.db")
assert client.bd_path == "/usr/bin/bd"
assert client.beads_db == "/tmp/test.db"
@pytest.mark.asyncio
async def test_bd_client_without_db():
"""Test BdClient initialization without database."""
client = BdClient(bd_path="/usr/bin/bd")
assert client.bd_path == "/usr/bin/bd"
assert client.beads_db is None
@pytest.mark.asyncio
async def test_run_command_success(bd_client, mock_process):
"""Test successful command execution."""
result_data = {"id": "bd-1", "title": "Test issue"}
mock_process.communicate = AsyncMock(return_value=(json.dumps(result_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await bd_client._run_command("show", "bd-1")
assert result == result_data
@pytest.mark.asyncio
async def test_run_command_not_found(bd_client):
"""Test command execution when bd executable not found."""
with (
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
pytest.raises(BdNotFoundError, match="bd command not found"),
):
await bd_client._run_command("show", "bd-1")
@pytest.mark.asyncio
async def test_run_command_failure(bd_client, mock_process):
"""Test command execution failure."""
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Error: Issue not found"))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="bd command failed"),
):
await bd_client._run_command("show", "bd-999")
@pytest.mark.asyncio
async def test_run_command_invalid_json(bd_client, mock_process):
"""Test command execution with invalid JSON output."""
mock_process.communicate = AsyncMock(return_value=(b"invalid json", b""))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="Failed to parse bd JSON output"),
):
await bd_client._run_command("show", "bd-1")
@pytest.mark.asyncio
async def test_run_command_empty_output(bd_client, mock_process):
"""Test command execution with empty output."""
mock_process.communicate = AsyncMock(return_value=(b"", b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await bd_client._run_command("show", "bd-1")
assert result == {}
@pytest.mark.asyncio
async def test_ready(bd_client, mock_process):
"""Test ready method."""
issues_data = [
{
"id": "bd-1",
"title": "Issue 1",
"status": "open",
"priority": 1,
"issue_type": "bug",
"created_at": "2025-01-25T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
},
{
"id": "bd-2",
"title": "Issue 2",
"status": "open",
"priority": 2,
"issue_type": "feature",
"created_at": "2025-01-25T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
},
]
mock_process.communicate = AsyncMock(return_value=(json.dumps(issues_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = ReadyWorkParams(limit=10, priority=1)
issues = await bd_client.ready(params)
assert len(issues) == 2
assert issues[0].id == "bd-1"
assert issues[1].id == "bd-2"
@pytest.mark.asyncio
async def test_ready_with_assignee(bd_client, mock_process):
"""Test ready method with assignee filter."""
issues_data = [
{
"id": "bd-1",
"title": "Issue 1",
"status": "open",
"priority": 1,
"issue_type": "bug",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
},
]
mock_process.communicate = AsyncMock(return_value=(json.dumps(issues_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = ReadyWorkParams(limit=10, assignee="alice")
issues = await bd_client.ready(params)
assert len(issues) == 1
assert issues[0].id == "bd-1"
@pytest.mark.asyncio
async def test_ready_invalid_response(bd_client, mock_process):
"""Test ready method with invalid response type."""
mock_process.communicate = AsyncMock(
return_value=(json.dumps({"error": "not a list"}).encode(), b"")
)
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = ReadyWorkParams(limit=10)
issues = await bd_client.ready(params)
assert issues == []
@pytest.mark.asyncio
async def test_list_issues(bd_client, mock_process):
"""Test list_issues method."""
issues_data = [
{
"id": "bd-1",
"title": "Issue 1",
"status": "open",
"priority": 1,
"issue_type": "bug",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
},
]
mock_process.communicate = AsyncMock(return_value=(json.dumps(issues_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = ListIssuesParams(status="open", priority=1)
issues = await bd_client.list_issues(params)
assert len(issues) == 1
assert issues[0].id == "bd-1"
@pytest.mark.asyncio
async def test_list_issues_invalid_response(bd_client, mock_process):
"""Test list_issues method with invalid response type."""
mock_process.communicate = AsyncMock(
return_value=(json.dumps({"error": "not a list"}).encode(), b"")
)
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = ListIssuesParams(status="open")
issues = await bd_client.list_issues(params)
assert issues == []
@pytest.mark.asyncio
async def test_show(bd_client, mock_process):
"""Test show method."""
issue_data = {
"id": "bd-1",
"title": "Test issue",
"description": "Test description",
"status": "open",
"priority": 1,
"issue_type": "bug",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
}
mock_process.communicate = AsyncMock(return_value=(json.dumps(issue_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = ShowIssueParams(issue_id="bd-1")
issue = await bd_client.show(params)
assert issue.id == "bd-1"
assert issue.title == "Test issue"
@pytest.mark.asyncio
async def test_show_invalid_response(bd_client, mock_process):
"""Test show method with invalid response type."""
mock_process.communicate = AsyncMock(return_value=(json.dumps(["not a dict"]).encode(), b""))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="Invalid response for show"),
):
params = ShowIssueParams(issue_id="bd-1")
await bd_client.show(params)
@pytest.mark.asyncio
async def test_create(bd_client, mock_process):
"""Test create method."""
issue_data = {
"id": "bd-5",
"title": "New issue",
"description": "New description",
"status": "open",
"priority": 2,
"issue_type": "feature",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
}
mock_process.communicate = AsyncMock(return_value=(json.dumps(issue_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = CreateIssueParams(
title="New issue",
description="New description",
priority=2,
issue_type="feature",
)
issue = await bd_client.create(params)
assert issue.id == "bd-5"
assert issue.title == "New issue"
@pytest.mark.asyncio
async def test_create_with_optional_fields(bd_client, mock_process):
"""Test create method with all optional fields."""
issue_data = {
"id": "test-42",
"title": "New issue",
"description": "Full description",
"design": "Design notes",
"acceptance_criteria": "Acceptance criteria",
"external_ref": "gh-123",
"status": "open",
"priority": 1,
"issue_type": "feature",
"created_at": "2025-01-25T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
}
mock_process.communicate = AsyncMock(return_value=(json.dumps(issue_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = CreateIssueParams(
title="New issue",
description="Full description",
design="Design notes",
acceptance="Acceptance criteria",
external_ref="gh-123",
priority=1,
issue_type="feature",
id="test-42",
deps=["bd-1", "bd-2"],
)
issue = await bd_client.create(params)
assert issue.id == "test-42"
assert issue.title == "New issue"
@pytest.mark.asyncio
async def test_create_invalid_response(bd_client, mock_process):
"""Test create method with invalid response type."""
mock_process.communicate = AsyncMock(return_value=(json.dumps(["not a dict"]).encode(), b""))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="Invalid response for create"),
):
params = CreateIssueParams(title="Test", priority=1, issue_type="task")
await bd_client.create(params)
@pytest.mark.asyncio
async def test_update(bd_client, mock_process):
"""Test update method."""
issue_data = {
"id": "bd-1",
"title": "Updated title",
"status": "in_progress",
"priority": 1,
"issue_type": "bug",
"created_at": "2025-01-25T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
}
mock_process.communicate = AsyncMock(return_value=(json.dumps(issue_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = UpdateIssueParams(issue_id="bd-1", status="in_progress", title="Updated title")
issue = await bd_client.update(params)
assert issue.id == "bd-1"
assert issue.status == "in_progress"
@pytest.mark.asyncio
async def test_update_with_optional_fields(bd_client, mock_process):
"""Test update method with all optional fields."""
issue_data = {
"id": "bd-1",
"title": "Updated title",
"design": "Design notes",
"acceptance_criteria": "Acceptance criteria",
"notes": "Additional notes",
"external_ref": "gh-456",
"status": "in_progress",
"priority": 0,
"issue_type": "bug",
"created_at": "2025-01-25T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
}
mock_process.communicate = AsyncMock(return_value=(json.dumps(issue_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = UpdateIssueParams(
issue_id="bd-1",
assignee="alice",
design="Design notes",
acceptance_criteria="Acceptance criteria",
notes="Additional notes",
external_ref="gh-456",
)
issue = await bd_client.update(params)
assert issue.id == "bd-1"
assert issue.title == "Updated title"
@pytest.mark.asyncio
async def test_update_invalid_response(bd_client, mock_process):
"""Test update method with invalid response type."""
mock_process.communicate = AsyncMock(return_value=(json.dumps(["not a dict"]).encode(), b""))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="Invalid response for update"),
):
params = UpdateIssueParams(issue_id="bd-1", status="in_progress")
await bd_client.update(params)
@pytest.mark.asyncio
async def test_close(bd_client, mock_process):
"""Test close method."""
issues_data = [
{
"id": "bd-1",
"title": "Closed issue",
"status": "closed",
"priority": 1,
"issue_type": "bug",
"created_at": "2025-01-25T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
"closed_at": "2025-01-25T01:00:00Z",
}
]
mock_process.communicate = AsyncMock(return_value=(json.dumps(issues_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = CloseIssueParams(issue_id="bd-1", reason="Completed")
issues = await bd_client.close(params)
assert len(issues) == 1
assert issues[0].status == "closed"
@pytest.mark.asyncio
async def test_close_invalid_response(bd_client, mock_process):
"""Test close method with invalid response type."""
mock_process.communicate = AsyncMock(
return_value=(json.dumps({"error": "not a list"}).encode(), b"")
)
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="Invalid response for close"),
):
params = CloseIssueParams(issue_id="bd-1", reason="Test")
await bd_client.close(params)
@pytest.mark.asyncio
async def test_add_dependency(bd_client, mock_process):
"""Test add_dependency method."""
mock_process.communicate = AsyncMock(return_value=(b"Dependency added\n", b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
params = AddDependencyParams(from_id="bd-2", to_id="bd-1", dep_type="blocks")
await bd_client.add_dependency(params)
# Should complete without raising an exception
@pytest.mark.asyncio
async def test_add_dependency_failure(bd_client, mock_process):
"""Test add_dependency with failure."""
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Dependency already exists"))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="bd dep add failed"),
):
params = AddDependencyParams(from_id="bd-2", to_id="bd-1", dep_type="blocks")
await bd_client.add_dependency(params)
@pytest.mark.asyncio
async def test_add_dependency_not_found(bd_client):
"""Test add_dependency when bd executable not found."""
with (
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
pytest.raises(BdNotFoundError, match="bd command not found"),
):
params = AddDependencyParams(from_id="bd-2", to_id="bd-1", dep_type="blocks")
await bd_client.add_dependency(params)
@pytest.mark.asyncio
async def test_quickstart(bd_client, mock_process):
"""Test quickstart method."""
quickstart_text = "# Beads Quickstart\n\nWelcome to beads..."
mock_process.communicate = AsyncMock(return_value=(quickstart_text.encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await bd_client.quickstart()
assert result == quickstart_text
@pytest.mark.asyncio
async def test_quickstart_failure(bd_client, mock_process):
"""Test quickstart with failure."""
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Command not found"))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="bd quickstart failed"),
):
await bd_client.quickstart()
@pytest.mark.asyncio
async def test_quickstart_not_found(bd_client):
"""Test quickstart when bd executable not found."""
with (
patch("asyncio.create_subprocess_exec", side_effect=FileNotFoundError()),
pytest.raises(BdNotFoundError, match="bd command not found"),
):
await bd_client.quickstart()
@pytest.mark.asyncio
async def test_stats(bd_client, mock_process):
"""Test stats method."""
stats_data = {
"total_issues": 10,
"open_issues": 5,
"in_progress_issues": 2,
"closed_issues": 3,
"blocked_issues": 1,
"ready_issues": 4,
"average_lead_time_hours": 24.5,
}
mock_process.communicate = AsyncMock(return_value=(json.dumps(stats_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await bd_client.stats()
assert result.total_issues == 10
assert result.open_issues == 5
@pytest.mark.asyncio
async def test_stats_invalid_response(bd_client, mock_process):
"""Test stats method with invalid response type."""
mock_process.communicate = AsyncMock(return_value=(json.dumps(["not a dict"]).encode(), b""))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="Invalid response for stats"),
):
await bd_client.stats()
@pytest.mark.asyncio
async def test_blocked(bd_client, mock_process):
"""Test blocked method."""
blocked_data = [
{
"id": "bd-1",
"title": "Blocked issue",
"status": "blocked",
"priority": 1,
"issue_type": "bug",
"created_at": "2025-01-25T00:00:00Z",
"updated_at": "2025-01-25T00:00:00Z",
"blocked_by_count": 2,
"blocked_by": ["bd-2", "bd-3"],
}
]
mock_process.communicate = AsyncMock(return_value=(json.dumps(blocked_data).encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await bd_client.blocked()
assert len(result) == 1
assert result[0].id == "bd-1"
assert result[0].blocked_by_count == 2
@pytest.mark.asyncio
async def test_blocked_invalid_response(bd_client, mock_process):
"""Test blocked method with invalid response type."""
mock_process.communicate = AsyncMock(
return_value=(json.dumps({"error": "not a list"}).encode(), b"")
)
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
result = await bd_client.blocked()
assert result == []
@pytest.mark.asyncio
async def test_init(bd_client, mock_process):
"""Test init method."""
init_output = "bd initialized successfully!"
mock_process.communicate = AsyncMock(return_value=(init_output.encode(), b""))
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
from beads_mcp.models import InitParams
params = InitParams(prefix="test")
result = await bd_client.init(params)
assert "bd initialized successfully!" in result
@pytest.mark.asyncio
async def test_init_failure(bd_client, mock_process):
"""Test init method with command failure."""
mock_process.returncode = 1
mock_process.communicate = AsyncMock(return_value=(b"", b"Failed to initialize"))
with (
patch("asyncio.create_subprocess_exec", return_value=mock_process),
pytest.raises(BdCommandError, match="bd init failed"),
):
await bd_client.init()

View File

@@ -0,0 +1,351 @@
"""Real integration tests for BdClient using actual bd binary."""
import os
import shutil
import tempfile
from pathlib import Path
import pytest
from beads_mcp.bd_client import BdClient, BdCommandError, BdNotFoundError
from beads_mcp.models import (
AddDependencyParams,
CloseIssueParams,
CreateIssueParams,
DependencyType,
IssueStatus,
IssueType,
ListIssuesParams,
ReadyWorkParams,
ShowIssueParams,
UpdateIssueParams,
)
@pytest.fixture(scope="session")
def bd_executable():
"""Verify bd is available in PATH."""
bd_path = shutil.which("bd")
if not bd_path:
pytest.fail(
"bd executable not found in PATH. "
"Please install bd or add it to your PATH before running integration tests."
)
return bd_path
@pytest.fixture
def temp_db():
"""Create a temporary database file."""
fd, db_path = tempfile.mkstemp(suffix=".db", prefix="beads_test_", dir="/tmp")
os.close(fd)
# Remove the file so bd init can create it
os.unlink(db_path)
yield db_path
# Cleanup
if os.path.exists(db_path):
os.unlink(db_path)
@pytest.fixture
async def bd_client(bd_executable, temp_db):
"""Create BdClient with temporary database - fully hermetic."""
client = BdClient(bd_path=bd_executable, beads_db=temp_db)
# Initialize database with explicit BEADS_DB - no chdir needed!
env = os.environ.copy()
# Clear any existing BEADS_DB to ensure we use only temp_db
env.pop("BEADS_DB", None)
env["BEADS_DB"] = temp_db
import asyncio
# Use temp dir for subprocess to run in (prevents .beads/ discovery)
with tempfile.TemporaryDirectory(prefix="beads_test_workspace_", dir="/tmp") as temp_dir:
process = await asyncio.create_subprocess_exec(
bd_executable,
"init",
"--prefix",
"test",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=temp_dir, # Run in temp dir, not project dir
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
pytest.fail(f"Failed to initialize test database: {stderr.decode()}")
yield client
@pytest.mark.asyncio
async def test_create_and_show_issue(bd_client):
"""Test creating and showing an issue with real bd."""
# Create issue
params = CreateIssueParams(
title="Test integration issue",
description="This is a real integration test",
priority=1,
issue_type="bug",
)
created = await bd_client.create(params)
assert created.id is not None
assert created.title == "Test integration issue"
assert created.description == "This is a real integration test"
assert created.priority == 1
assert created.issue_type == "bug"
assert created.status == "open"
# Show issue
show_params = ShowIssueParams(issue_id=created.id)
shown = await bd_client.show(show_params)
assert shown.id == created.id
assert shown.title == created.title
assert shown.description == created.description
@pytest.mark.asyncio
async def test_list_issues(bd_client):
"""Test listing issues with real bd."""
# Create multiple issues
for i in range(3):
params = CreateIssueParams(
title=f"Test issue {i}",
priority=i,
issue_type="task",
)
await bd_client.create(params)
# List all issues
params = ListIssuesParams()
issues = await bd_client.list_issues(params)
assert len(issues) >= 3
# List with status filter
params = ListIssuesParams(status="open")
issues = await bd_client.list_issues(params)
assert all(issue.status == "open" for issue in issues)
@pytest.mark.asyncio
async def test_update_issue(bd_client):
"""Test updating an issue with real bd."""
# Create issue
create_params = CreateIssueParams(
title="Issue to update",
priority=2,
issue_type="feature",
)
created = await bd_client.create(create_params)
# Update issue
update_params = UpdateIssueParams(
issue_id=created.id,
status="in_progress",
priority=0,
title="Updated title",
)
updated = await bd_client.update(update_params)
assert updated.id == created.id
assert updated.status == "in_progress"
assert updated.priority == 0
assert updated.title == "Updated title"
@pytest.mark.asyncio
async def test_close_issue(bd_client):
"""Test closing an issue with real bd."""
# Create issue
create_params = CreateIssueParams(
title="Issue to close",
priority=1,
issue_type="bug",
)
created = await bd_client.create(create_params)
# Close issue
close_params = CloseIssueParams(issue_id=created.id, reason="Testing complete")
closed_issues = await bd_client.close(close_params)
assert len(closed_issues) >= 1
closed = closed_issues[0]
assert closed.id == created.id
assert closed.status == "closed"
assert closed.closed_at is not None
@pytest.mark.asyncio
async def test_add_dependency(bd_client):
"""Test adding dependencies with real bd."""
# Create two issues
issue1 = await bd_client.create(
CreateIssueParams(title="Issue 1", priority=1, issue_type="task")
)
issue2 = await bd_client.create(
CreateIssueParams(title="Issue 2", priority=1, issue_type="task")
)
# Add dependency: issue2 blocks issue1
params = AddDependencyParams(
from_id=issue1.id, to_id=issue2.id, dep_type="blocks"
)
await bd_client.add_dependency(params)
# Verify dependency by showing issue1
show_params = ShowIssueParams(issue_id=issue1.id)
shown = await bd_client.show(show_params)
assert len(shown.dependencies) > 0
assert any(dep.id == issue2.id for dep in shown.dependencies)
@pytest.mark.asyncio
async def test_ready_work(bd_client):
"""Test getting ready work with real bd."""
# Create issue with no dependencies (should be ready)
ready_issue = await bd_client.create(
CreateIssueParams(title="Ready issue", priority=1, issue_type="task")
)
# Create blocked issue
blocking_issue = await bd_client.create(
CreateIssueParams(title="Blocking issue", priority=1, issue_type="task")
)
blocked_issue = await bd_client.create(
CreateIssueParams(title="Blocked issue", priority=1, issue_type="task")
)
# Add blocking dependency
await bd_client.add_dependency(
AddDependencyParams(
from_id=blocked_issue.id,
to_id=blocking_issue.id,
dep_type="blocks",
)
)
# Get ready work
params = ReadyWorkParams(limit=100)
ready_issues = await bd_client.ready(params)
# ready_issue should be in ready work
ready_ids = [issue.id for issue in ready_issues]
assert ready_issue.id in ready_ids
# blocked_issue should NOT be in ready work
assert blocked_issue.id not in ready_ids
@pytest.mark.asyncio
async def test_quickstart(bd_client):
"""Test quickstart command with real bd."""
result = await bd_client.quickstart()
assert len(result) > 0
assert "beads" in result.lower() or "bd" in result.lower()
@pytest.mark.asyncio
async def test_create_with_labels(bd_client):
"""Test creating issue with labels."""
params = CreateIssueParams(
title="Issue with labels",
priority=1,
issue_type="feature",
labels=["urgent", "backend"],
)
created = await bd_client.create(params)
# Note: bd currently doesn't return labels in JSON output
# This test verifies the command succeeds with labels parameter
assert created.id is not None
assert created.title == "Issue with labels"
@pytest.mark.asyncio
async def test_create_with_assignee(bd_client):
"""Test creating issue with assignee."""
params = CreateIssueParams(
title="Assigned issue",
priority=1,
issue_type="task",
assignee="testuser",
)
created = await bd_client.create(params)
assert created.assignee == "testuser"
@pytest.mark.asyncio
async def test_list_with_filters(bd_client):
"""Test listing issues with multiple filters."""
# Create issues with different attributes
await bd_client.create(
CreateIssueParams(
title="Bug P0",
priority=0,
issue_type="bug",
assignee="alice",
)
)
await bd_client.create(
CreateIssueParams(
title="Feature P1",
priority=1,
issue_type="feature",
assignee="bob",
)
)
# Filter by priority
params = ListIssuesParams(priority=0)
issues = await bd_client.list_issues(params)
assert all(issue.priority == 0 for issue in issues)
# Filter by type
params = ListIssuesParams(issue_type="bug")
issues = await bd_client.list_issues(params)
assert all(issue.issue_type == "bug" for issue in issues)
# Filter by assignee
params = ListIssuesParams(assignee="alice")
issues = await bd_client.list_issues(params)
assert all(issue.assignee == "alice" for issue in issues)
@pytest.mark.asyncio
async def test_invalid_issue_id(bd_client):
"""Test showing non-existent issue."""
params = ShowIssueParams(issue_id="test-999")
with pytest.raises(BdCommandError, match="bd command failed"):
await bd_client.show(params)
@pytest.mark.asyncio
async def test_dependency_types(bd_client):
"""Test different dependency types."""
issue1 = await bd_client.create(
CreateIssueParams(title="Issue 1", priority=1, issue_type="task")
)
issue2 = await bd_client.create(
CreateIssueParams(title="Issue 2", priority=1, issue_type="task")
)
# Test related dependency
params = AddDependencyParams(
from_id=issue1.id, to_id=issue2.id, dep_type="related"
)
await bd_client.add_dependency(params)
# Verify
show_params = ShowIssueParams(issue_id=issue1.id)
shown = await bd_client.show(show_params)
assert len(shown.dependencies) > 0

View File

@@ -0,0 +1,524 @@
"""Real integration tests for MCP server using fastmcp.Client."""
import os
import shutil
import tempfile
import pytest
from fastmcp.client import Client
from beads_mcp.server import mcp
@pytest.fixture(scope="session")
def bd_executable():
"""Verify bd is available in PATH."""
bd_path = shutil.which("bd")
if not bd_path:
pytest.fail(
"bd executable not found in PATH. "
"Please install bd or add it to your PATH before running integration tests."
)
return bd_path
@pytest.fixture
async def temp_db(bd_executable):
"""Create a temporary database file and initialize it - fully hermetic."""
# Create temp directory for database
temp_dir = tempfile.mkdtemp(prefix="beads_mcp_test_", dir="/tmp")
db_path = os.path.join(temp_dir, "test.db")
# Initialize database with explicit BEADS_DB - no chdir needed!
import asyncio
env = os.environ.copy()
# Clear any existing BEADS_DB to ensure we use only temp db
env.pop("BEADS_DB", None)
env["BEADS_DB"] = db_path
# Use temp workspace dir for subprocess (prevents .beads/ discovery)
with tempfile.TemporaryDirectory(
prefix="beads_mcp_test_workspace_", dir="/tmp"
) as temp_workspace:
process = await asyncio.create_subprocess_exec(
bd_executable,
"init",
"--prefix",
"test",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
cwd=temp_workspace, # Run in temp workspace, not project dir
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
pytest.fail(f"Failed to initialize test database: {stderr.decode()}")
yield db_path
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
async def mcp_client(bd_executable, temp_db, monkeypatch):
"""Create MCP client with temporary database."""
from beads_mcp import tools
from beads_mcp.bd_client import BdClient
# Reset client before test
tools._client = None
# Create a pre-configured client with explicit paths (bypasses config loading)
tools._client = BdClient(bd_path=bd_executable, beads_db=temp_db)
# Create test client
async with Client(mcp) as client:
yield client
# Reset client after test
tools._client = None
@pytest.mark.asyncio
async def test_quickstart_resource(mcp_client):
"""Test beads://quickstart resource."""
result = await mcp_client.read_resource("beads://quickstart")
assert result is not None
content = result[0].text
assert len(content) > 0
assert "beads" in content.lower() or "bd" in content.lower()
@pytest.mark.asyncio
async def test_create_issue_tool(mcp_client):
"""Test create_issue tool."""
result = await mcp_client.call_tool(
"create",
{
"title": "Test MCP issue",
"description": "Created via MCP server",
"priority": 1,
"issue_type": "bug",
},
)
# Parse the JSON response from CallToolResult
import json
issue_data = json.loads(result.content[0].text)
assert issue_data["title"] == "Test MCP issue"
assert issue_data["description"] == "Created via MCP server"
assert issue_data["priority"] == 1
assert issue_data["issue_type"] == "bug"
assert issue_data["status"] == "open"
assert "id" in issue_data
return issue_data["id"]
@pytest.mark.asyncio
async def test_show_issue_tool(mcp_client):
"""Test show_issue tool."""
# First create an issue
create_result = await mcp_client.call_tool(
"create",
{"title": "Issue to show", "priority": 2, "issue_type": "task"},
)
import json
created = json.loads(create_result.content[0].text)
issue_id = created["id"]
# Show the issue
show_result = await mcp_client.call_tool("show", {"issue_id": issue_id})
issue = json.loads(show_result.content[0].text)
assert issue["id"] == issue_id
assert issue["title"] == "Issue to show"
@pytest.mark.asyncio
async def test_list_issues_tool(mcp_client):
"""Test list_issues tool."""
# Create some issues first
await mcp_client.call_tool(
"create", {"title": "Issue 1", "priority": 0, "issue_type": "bug"}
)
await mcp_client.call_tool(
"create", {"title": "Issue 2", "priority": 1, "issue_type": "feature"}
)
# List all issues
result = await mcp_client.call_tool("list", {})
import json
issues = json.loads(result.content[0].text)
assert len(issues) >= 2
# List with status filter
result = await mcp_client.call_tool("list", {"status": "open"})
issues = json.loads(result.content[0].text)
assert all(issue["status"] == "open" for issue in issues)
@pytest.mark.asyncio
async def test_update_issue_tool(mcp_client):
"""Test update_issue tool."""
import json
# Create issue
create_result = await mcp_client.call_tool(
"create", {"title": "Issue to update", "priority": 2, "issue_type": "task"}
)
created = json.loads(create_result.content[0].text)
issue_id = created["id"]
# Update issue
update_result = await mcp_client.call_tool(
"update",
{
"issue_id": issue_id,
"status": "in_progress",
"priority": 0,
"title": "Updated title",
},
)
updated = json.loads(update_result.content[0].text)
assert updated["id"] == issue_id
assert updated["status"] == "in_progress"
assert updated["priority"] == 0
assert updated["title"] == "Updated title"
@pytest.mark.asyncio
async def test_close_issue_tool(mcp_client):
"""Test close_issue tool."""
import json
# Create issue
create_result = await mcp_client.call_tool(
"create", {"title": "Issue to close", "priority": 1, "issue_type": "bug"}
)
created = json.loads(create_result.content[0].text)
issue_id = created["id"]
# Close issue
close_result = await mcp_client.call_tool(
"close", {"issue_id": issue_id, "reason": "Test complete"}
)
closed_issues = json.loads(close_result.content[0].text)
assert len(closed_issues) >= 1
closed = closed_issues[0]
assert closed["id"] == issue_id
assert closed["status"] == "closed"
assert closed["closed_at"] is not None
@pytest.mark.asyncio
async def test_ready_work_tool(mcp_client):
"""Test ready_work tool."""
import json
# Create a ready issue (no dependencies)
ready_result = await mcp_client.call_tool(
"create", {"title": "Ready work", "priority": 1, "issue_type": "task"}
)
ready_issue = json.loads(ready_result.content[0].text)
# Create blocked issue
blocking_result = await mcp_client.call_tool(
"create", {"title": "Blocking issue", "priority": 1, "issue_type": "task"}
)
blocking_issue = json.loads(blocking_result.content[0].text)
blocked_result = await mcp_client.call_tool(
"create", {"title": "Blocked issue", "priority": 1, "issue_type": "task"}
)
blocked_issue = json.loads(blocked_result.content[0].text)
# Add blocking dependency
await mcp_client.call_tool(
"dep",
{
"from_id": blocked_issue["id"],
"to_id": blocking_issue["id"],
"dep_type": "blocks",
},
)
# Get ready work
result = await mcp_client.call_tool("ready", {"limit": 100})
ready_issues = json.loads(result.content[0].text)
ready_ids = [issue["id"] for issue in ready_issues]
assert ready_issue["id"] in ready_ids
assert blocked_issue["id"] not in ready_ids
@pytest.mark.asyncio
async def test_add_dependency_tool(mcp_client):
"""Test add_dependency tool."""
import json
# Create two issues
issue1_result = await mcp_client.call_tool(
"create", {"title": "Issue 1", "priority": 1, "issue_type": "task"}
)
issue1 = json.loads(issue1_result.content[0].text)
issue2_result = await mcp_client.call_tool(
"create", {"title": "Issue 2", "priority": 1, "issue_type": "task"}
)
issue2 = json.loads(issue2_result.content[0].text)
# Add dependency
result = await mcp_client.call_tool(
"dep",
{"from_id": issue1["id"], "to_id": issue2["id"], "dep_type": "blocks"},
)
message = result.content[0].text
assert "Added dependency" in message
assert issue1["id"] in message
assert issue2["id"] in message
@pytest.mark.asyncio
async def test_create_with_all_fields(mcp_client):
"""Test create_issue with all optional fields."""
import json
result = await mcp_client.call_tool(
"create",
{
"title": "Full issue",
"description": "Complete description",
"priority": 0,
"issue_type": "feature",
"assignee": "testuser",
"labels": ["urgent", "backend"],
},
)
issue = json.loads(result.content[0].text)
assert issue["title"] == "Full issue"
assert issue["description"] == "Complete description"
assert issue["priority"] == 0
assert issue["issue_type"] == "feature"
assert issue["assignee"] == "testuser"
@pytest.mark.asyncio
async def test_list_with_filters(mcp_client):
"""Test list_issues with various filters."""
import json
# Create issues with different attributes
await mcp_client.call_tool(
"create",
{
"title": "Bug P0",
"priority": 0,
"issue_type": "bug",
"assignee": "alice",
},
)
await mcp_client.call_tool(
"create",
{
"title": "Feature P1",
"priority": 1,
"issue_type": "feature",
"assignee": "bob",
},
)
# Filter by priority
result = await mcp_client.call_tool("list", {"priority": 0})
issues = json.loads(result.content[0].text)
assert all(issue["priority"] == 0 for issue in issues)
# Filter by type
result = await mcp_client.call_tool("list", {"issue_type": "bug"})
issues = json.loads(result.content[0].text)
assert all(issue["issue_type"] == "bug" for issue in issues)
# Filter by assignee
result = await mcp_client.call_tool("list", {"assignee": "alice"})
issues = json.loads(result.content[0].text)
assert all(issue["assignee"] == "alice" for issue in issues)
@pytest.mark.asyncio
async def test_ready_work_with_priority_filter(mcp_client):
"""Test ready_work with priority filter."""
import json
# Create issues with different priorities
await mcp_client.call_tool(
"create", {"title": "P0 issue", "priority": 0, "issue_type": "bug"}
)
await mcp_client.call_tool(
"create", {"title": "P1 issue", "priority": 1, "issue_type": "task"}
)
# Get ready work with priority filter
result = await mcp_client.call_tool("ready", {"priority": 0, "limit": 100})
issues = json.loads(result.content[0].text)
assert all(issue["priority"] == 0 for issue in issues)
@pytest.mark.asyncio
async def test_update_partial_fields(mcp_client):
"""Test update_issue with partial field updates."""
import json
# Create issue
create_result = await mcp_client.call_tool(
"create",
{
"title": "Original title",
"description": "Original description",
"priority": 2,
"issue_type": "task",
},
)
created = json.loads(create_result.content[0].text)
issue_id = created["id"]
# Update only status
update_result = await mcp_client.call_tool(
"update", {"issue_id": issue_id, "status": "in_progress"}
)
updated = json.loads(update_result.content[0].text)
assert updated["status"] == "in_progress"
assert updated["title"] == "Original title" # Unchanged
assert updated["priority"] == 2 # Unchanged
@pytest.mark.asyncio
async def test_dependency_types(mcp_client):
"""Test different dependency types."""
import json
# Create issues
issue1_result = await mcp_client.call_tool(
"create", {"title": "Issue 1", "priority": 1, "issue_type": "task"}
)
issue1 = json.loads(issue1_result.content[0].text)
issue2_result = await mcp_client.call_tool(
"create", {"title": "Issue 2", "priority": 1, "issue_type": "task"}
)
issue2 = json.loads(issue2_result.content[0].text)
# Test related dependency
result = await mcp_client.call_tool(
"dep",
{"from_id": issue1["id"], "to_id": issue2["id"], "dep_type": "related"},
)
message = result.content[0].text
assert "Added dependency" in message
assert "related" in message
@pytest.mark.asyncio
async def test_stats_tool(mcp_client):
"""Test stats tool."""
import json
# Create some issues to get stats
await mcp_client.call_tool(
"create", {"title": "Stats test 1", "priority": 1, "issue_type": "bug"}
)
await mcp_client.call_tool(
"create", {"title": "Stats test 2", "priority": 2, "issue_type": "task"}
)
# Get stats
result = await mcp_client.call_tool("stats", {})
stats = json.loads(result.content[0].text)
assert "total_issues" in stats
assert "open_issues" in stats
assert stats["total_issues"] >= 2
@pytest.mark.asyncio
async def test_blocked_tool(mcp_client):
"""Test blocked tool."""
import json
# Create two issues
blocking_result = await mcp_client.call_tool(
"create", {"title": "Blocking issue", "priority": 1, "issue_type": "task"}
)
blocking_issue = json.loads(blocking_result.content[0].text)
blocked_result = await mcp_client.call_tool(
"create", {"title": "Blocked issue", "priority": 1, "issue_type": "task"}
)
blocked_issue = json.loads(blocked_result.content[0].text)
# Add blocking dependency
await mcp_client.call_tool(
"dep",
{
"from_id": blocked_issue["id"],
"to_id": blocking_issue["id"],
"dep_type": "blocks",
},
)
# Get blocked issues
result = await mcp_client.call_tool("blocked", {})
blocked_issues = json.loads(result.content[0].text)
# Should have at least the one we created
blocked_ids = [issue["id"] for issue in blocked_issues]
assert blocked_issue["id"] in blocked_ids
# Find our blocked issue and verify it has blocking info
our_blocked = next(issue for issue in blocked_issues if issue["id"] == blocked_issue["id"])
assert our_blocked["blocked_by_count"] >= 1
assert blocking_issue["id"] in our_blocked["blocked_by"]
@pytest.mark.asyncio
async def test_init_tool(mcp_client, bd_executable):
"""Test init tool."""
import os
import tempfile
# Create a completely separate temp directory and database
with tempfile.TemporaryDirectory(prefix="beads_init_test_", dir="/tmp") as temp_dir:
new_db_path = os.path.join(temp_dir, "new_test.db")
# Temporarily override the client's BEADS_DB for this test
from beads_mcp import tools
# Save original client
original_client = tools._client
# Create a new client pointing to the new database path
from beads_mcp.bd_client import BdClient
tools._client = BdClient(bd_path=bd_executable, beads_db=new_db_path)
try:
# Call init tool
result = await mcp_client.call_tool("init", {"prefix": "test-init"})
output = result.content[0].text
# Verify output contains success message
assert "bd initialized successfully!" in output
finally:
# Restore original client
tools._client = original_client

View File

@@ -0,0 +1,364 @@
"""Integration tests for MCP tools."""
from unittest.mock import AsyncMock, patch
import pytest
from beads_mcp.bd_client import BdClient
from beads_mcp.models import BlockedIssue, Issue, IssueStatus, IssueType, Stats
from beads_mcp.tools import (
beads_add_dependency,
beads_blocked,
beads_close_issue,
beads_create_issue,
beads_init,
beads_list_issues,
beads_quickstart,
beads_ready_work,
beads_show_issue,
beads_stats,
beads_update_issue,
)
@pytest.fixture(autouse=True)
def mock_client():
"""Mock the BdClient for all tests."""
from beads_mcp import tools
# Reset client before each test
tools._client = None
yield
# Reset client after each test
tools._client = None
@pytest.fixture
def sample_issue():
"""Create a sample issue for testing."""
return Issue(
id="bd-1",
title="Test issue",
description="Test description",
status="open",
priority=1,
issue_type="bug",
created_at="2024-01-01T00:00:00Z",
updated_at="2024-01-01T00:00:00Z",
)
@pytest.mark.asyncio
async def test_beads_ready_work(sample_issue):
"""Test beads_ready_work tool."""
mock_client = AsyncMock()
mock_client.ready = AsyncMock(return_value=[sample_issue])
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issues = await beads_ready_work(limit=10, priority=1)
assert len(issues) == 1
assert issues[0].id == "bd-1"
mock_client.ready.assert_called_once()
@pytest.mark.asyncio
async def test_beads_ready_work_no_params():
"""Test beads_ready_work with default parameters."""
mock_client = AsyncMock()
mock_client.ready = AsyncMock(return_value=[])
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issues = await beads_ready_work()
assert len(issues) == 0
mock_client.ready.assert_called_once()
@pytest.mark.asyncio
async def test_beads_list_issues(sample_issue):
"""Test beads_list_issues tool."""
mock_client = AsyncMock()
mock_client.list_issues = AsyncMock(return_value=[sample_issue])
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issues = await beads_list_issues(status="open", priority=1)
assert len(issues) == 1
assert issues[0].id == "bd-1"
mock_client.list_issues.assert_called_once()
@pytest.mark.asyncio
async def test_beads_show_issue(sample_issue):
"""Test beads_show_issue tool."""
mock_client = AsyncMock()
mock_client.show = AsyncMock(return_value=sample_issue)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issue = await beads_show_issue(issue_id="bd-1")
assert issue.id == "bd-1"
assert issue.title == "Test issue"
mock_client.show.assert_called_once()
@pytest.mark.asyncio
async def test_beads_create_issue(sample_issue):
"""Test beads_create_issue tool."""
mock_client = AsyncMock()
mock_client.create = AsyncMock(return_value=sample_issue)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issue = await beads_create_issue(
title="New issue",
description="New description",
priority=2,
issue_type="feature",
)
assert issue.id == "bd-1"
mock_client.create.assert_called_once()
@pytest.mark.asyncio
async def test_beads_create_issue_with_labels(sample_issue):
"""Test beads_create_issue with labels."""
mock_client = AsyncMock()
mock_client.create = AsyncMock(return_value=sample_issue)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issue = await beads_create_issue(
title="New issue", labels=["bug", "urgent"]
)
assert issue.id == "bd-1"
mock_client.create.assert_called_once()
@pytest.mark.asyncio
async def test_beads_update_issue(sample_issue):
"""Test beads_update_issue tool."""
updated_issue = sample_issue.model_copy(
update={"status": "in_progress"}
)
mock_client = AsyncMock()
mock_client.update = AsyncMock(return_value=updated_issue)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issue = await beads_update_issue(issue_id="bd-1", status="in_progress")
assert issue.status == "in_progress"
mock_client.update.assert_called_once()
@pytest.mark.asyncio
async def test_beads_close_issue(sample_issue):
"""Test beads_close_issue tool."""
closed_issue = sample_issue.model_copy(
update={"status": "closed", "closed_at": "2024-01-02T00:00:00Z"}
)
mock_client = AsyncMock()
mock_client.close = AsyncMock(return_value=[closed_issue])
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issues = await beads_close_issue(issue_id="bd-1", reason="Completed")
assert len(issues) == 1
assert issues[0].status == "closed"
mock_client.close.assert_called_once()
@pytest.mark.asyncio
async def test_beads_add_dependency_success():
"""Test beads_add_dependency tool success."""
mock_client = AsyncMock()
mock_client.add_dependency = AsyncMock(return_value=None)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
result = await beads_add_dependency(
from_id="bd-2", to_id="bd-1", dep_type="blocks"
)
assert "Added dependency" in result
assert "bd-2" in result
assert "bd-1" in result
mock_client.add_dependency.assert_called_once()
@pytest.mark.asyncio
async def test_beads_add_dependency_error():
"""Test beads_add_dependency tool error handling."""
from beads_mcp.bd_client import BdError
mock_client = AsyncMock()
mock_client.add_dependency = AsyncMock(
side_effect=BdError("Dependency already exists")
)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
result = await beads_add_dependency(
from_id="bd-2", to_id="bd-1", dep_type="blocks"
)
assert "Error" in result
mock_client.add_dependency.assert_called_once()
@pytest.mark.asyncio
async def test_beads_quickstart():
"""Test beads_quickstart tool."""
quickstart_text = "# Beads Quickstart\n\nWelcome to beads..."
mock_client = AsyncMock()
mock_client.quickstart = AsyncMock(return_value=quickstart_text)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
result = await beads_quickstart()
assert "Beads Quickstart" in result
mock_client.quickstart.assert_called_once()
@pytest.mark.asyncio
async def test_client_lazy_initialization():
"""Test that client is lazily initialized on first use."""
from beads_mcp import tools
# Clear client
tools._client = None
# Verify client is None before first use
assert tools._client is None
# Mock BdClient to avoid actual bd calls
mock_client_instance = AsyncMock()
mock_client_instance.ready = AsyncMock(return_value=[])
with patch("beads_mcp.tools.BdClient") as MockBdClient:
MockBdClient.return_value = mock_client_instance
# First call should initialize client
await beads_ready_work()
# Verify BdClient was instantiated
MockBdClient.assert_called_once()
# Verify client is now set
assert tools._client is not None
# Second call should reuse client
MockBdClient.reset_mock()
await beads_ready_work()
# Verify BdClient was NOT called again
MockBdClient.assert_not_called()
@pytest.mark.asyncio
async def test_list_issues_with_all_filters(sample_issue):
"""Test beads_list_issues with all filter parameters."""
mock_client = AsyncMock()
mock_client.list_issues = AsyncMock(return_value=[sample_issue])
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issues = await beads_list_issues(
status="open",
priority=1,
issue_type="bug",
assignee="user1",
limit=100,
)
assert len(issues) == 1
mock_client.list_issues.assert_called_once()
@pytest.mark.asyncio
async def test_update_issue_multiple_fields(sample_issue):
"""Test beads_update_issue with multiple fields."""
updated_issue = sample_issue.model_copy(
update={
"status": "in_progress",
"priority": 0,
"title": "Updated title",
}
)
mock_client = AsyncMock()
mock_client.update = AsyncMock(return_value=updated_issue)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
issue = await beads_update_issue(
issue_id="bd-1",
status="in_progress",
priority=0,
title="Updated title",
)
assert issue.status == "in_progress"
assert issue.priority == 0
assert issue.title == "Updated title"
mock_client.update.assert_called_once()
@pytest.mark.asyncio
async def test_beads_stats():
"""Test beads_stats tool."""
stats_data = Stats(
total_issues=10,
open_issues=5,
in_progress_issues=2,
closed_issues=3,
blocked_issues=1,
ready_issues=4,
average_lead_time_hours=24.5,
)
mock_client = AsyncMock()
mock_client.stats = AsyncMock(return_value=stats_data)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
result = await beads_stats()
assert result.total_issues == 10
assert result.open_issues == 5
mock_client.stats.assert_called_once()
@pytest.mark.asyncio
async def test_beads_blocked():
"""Test beads_blocked tool."""
blocked_issue = BlockedIssue(
id="bd-1",
title="Blocked issue",
description="",
status="blocked",
priority=1,
issue_type="bug",
created_at="2024-01-01T00:00:00Z",
updated_at="2024-01-01T00:00:00Z",
blocked_by_count=2,
blocked_by=["bd-2", "bd-3"],
)
mock_client = AsyncMock()
mock_client.blocked = AsyncMock(return_value=[blocked_issue])
with patch("beads_mcp.tools._get_client", return_value=mock_client):
result = await beads_blocked()
assert len(result) == 1
assert result[0].id == "bd-1"
assert result[0].blocked_by_count == 2
mock_client.blocked.assert_called_once()
@pytest.mark.asyncio
async def test_beads_init():
"""Test beads_init tool."""
init_output = "bd initialized successfully!"
mock_client = AsyncMock()
mock_client.init = AsyncMock(return_value=init_output)
with patch("beads_mcp.tools._get_client", return_value=mock_client):
result = await beads_init(prefix="test")
assert "bd initialized successfully!" in result
mock_client.init.assert_called_once()