Merge remote-tracking branch 'origin/main'

Resolved JSONL conflicts using bd merge tool.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-11-20 19:31:00 -05:00
17 changed files with 3402 additions and 533 deletions

File diff suppressed because one or more lines are too long

View File

@@ -162,6 +162,12 @@ With --no-db: creates .beads/ directory and issues.jsonl file instead of SQLite
// Non-fatal - continue anyway // Non-fatal - continue anyway
} }
// Create README.md
if err := createReadme(localBeadsDir); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create README.md: %v\n", err)
// Non-fatal - continue anyway
}
if !quiet { if !quiet {
green := color.New(color.FgGreen).SprintFunc() green := color.New(color.FgGreen).SprintFunc()
cyan := color.New(color.FgCyan).SprintFunc() cyan := color.New(color.FgCyan).SprintFunc()
@@ -263,6 +269,12 @@ With --no-db: creates .beads/ directory and issues.jsonl file instead of SQLite
fmt.Fprintf(os.Stderr, "Warning: failed to create config.yaml: %v\n", err) fmt.Fprintf(os.Stderr, "Warning: failed to create config.yaml: %v\n", err)
// Non-fatal - continue anyway // Non-fatal - continue anyway
} }
// Create README.md
if err := createReadme(localBeadsDir); err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to create README.md: %v\n", err)
// Non-fatal - continue anyway
}
} }
// Check if git has existing issues to import (fresh clone scenario) // Check if git has existing issues to import (fresh clone scenario)
@@ -963,6 +975,107 @@ func createConfigYaml(beadsDir string, noDbMode bool) error {
return nil return nil
} }
// createReadme creates the README.md file in the .beads directory
func createReadme(beadsDir string) error {
readmePath := filepath.Join(beadsDir, "README.md")
// Skip if already exists
if _, err := os.Stat(readmePath); err == nil {
return nil
}
readmeTemplate := `# Beads - AI-Native Issue Tracking
Welcome to Beads! This repository uses **Beads** for issue tracking - a modern, AI-native tool designed to live directly in your codebase alongside your code.
## What is Beads?
Beads is issue tracking that lives in your repo, making it perfect for AI coding agents and developers who want their issues close to their code. No web UI required - everything works through the CLI and integrates seamlessly with git.
**Learn more:** [github.com/steveyegge/beads](https://github.com/steveyegge/beads)
## Quick Start
### Essential Commands
` + "```bash" + `
# Create new issues
bd create "Add user authentication"
# View all issues
bd list
# View issue details
bd show <issue-id>
# Update issue status
bd update <issue-id> --status in-progress
bd update <issue-id> --status done
# Sync with git remote
bd sync
` + "```" + `
### Working with Issues
Issues in Beads are:
- **Git-native**: Stored in ` + "`.beads/issues.jsonl`" + ` and synced like code
- **AI-friendly**: CLI-first design works perfectly with AI coding agents
- **Branch-aware**: Issues can follow your branch workflow
- **Always in sync**: Auto-syncs with your commits
## Why Beads?
✨ **AI-Native Design**
- Built specifically for AI-assisted development workflows
- CLI-first interface works seamlessly with AI coding agents
- No context switching to web UIs
🚀 **Developer Focused**
- Issues live in your repo, right next to your code
- Works offline, syncs when you push
- Fast, lightweight, and stays out of your way
🔧 **Git Integration**
- Automatic sync with git commits
- Branch-aware issue tracking
- Intelligent JSONL merge resolution
## Get Started with Beads
Try Beads in your own projects:
` + "```bash" + `
# Install Beads
curl -sSL https://raw.githubusercontent.com/steveyegge/beads/main/scripts/install.sh | bash
# Initialize in your repo
bd init
# Create your first issue
bd create "Try out Beads"
` + "```" + `
## Learn More
- **Documentation**: [github.com/steveyegge/beads/docs](https://github.com/steveyegge/beads/tree/main/docs)
- **Quick Start Guide**: Run ` + "`bd quickstart`" + `
- **Examples**: [github.com/steveyegge/beads/examples](https://github.com/steveyegge/beads/tree/main/examples)
---
*Beads: Issue tracking that moves at the speed of thought* ⚡
`
// Write README.md (0644 is standard for markdown files)
// #nosec G306 - README needs to be readable
if err := os.WriteFile(readmePath, []byte(readmeTemplate), 0644); err != nil {
return fmt.Errorf("failed to write README.md: %w", err)
}
return nil
}
// readFirstIssueFromJSONL reads the first issue from a JSONL file // readFirstIssueFromJSONL reads the first issue from a JSONL file
func readFirstIssueFromJSONL(path string) (*types.Issue, error) { func readFirstIssueFromJSONL(path string) (*types.Issue, error) {
// #nosec G304 -- helper reads JSONL file chosen by current bd command // #nosec G304 -- helper reads JSONL file chosen by current bd command

View File

@@ -89,4 +89,5 @@ dev = [
"pytest-asyncio>=1.2.0", "pytest-asyncio>=1.2.0",
"pytest-cov>=7.0.0", "pytest-cov>=7.0.0",
"ruff>=0.14.0", "ruff>=0.14.0",
"types-requests>=2.31.0",
] ]

View File

@@ -5,7 +5,7 @@ import json
import os import os
import re import re
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Optional from typing import Any, List, Optional
from .config import load_config from .config import load_config
from .models import ( from .models import (
@@ -114,6 +114,11 @@ class BdClientBase(ABC):
"""Add a dependency between issues.""" """Add a dependency between issues."""
pass pass
@abstractmethod
async def quickstart(self) -> str:
"""Get quickstart guide."""
pass
@abstractmethod @abstractmethod
async def stats(self) -> Stats: async def stats(self) -> Stats:
"""Get repository statistics.""" """Get repository statistics."""
@@ -130,17 +135,17 @@ class BdClientBase(ABC):
pass pass
@abstractmethod @abstractmethod
async def inspect_migration(self) -> dict: async def inspect_migration(self) -> dict[str, Any]:
"""Get migration plan and database state for agent analysis.""" """Get migration plan and database state for agent analysis."""
pass pass
@abstractmethod @abstractmethod
async def get_schema_info(self) -> dict: async def get_schema_info(self) -> dict[str, Any]:
"""Get current database schema for inspection.""" """Get current database schema for inspection."""
pass pass
@abstractmethod @abstractmethod
async def repair_deps(self, fix: bool = False) -> dict: async def repair_deps(self, fix: bool = False) -> dict[str, Any]:
"""Find and optionally fix orphaned dependency references. """Find and optionally fix orphaned dependency references.
Args: Args:
@@ -152,7 +157,7 @@ class BdClientBase(ABC):
pass pass
@abstractmethod @abstractmethod
async def detect_pollution(self, clean: bool = False) -> dict: async def detect_pollution(self, clean: bool = False) -> dict[str, Any]:
"""Detect test issues that leaked into production database. """Detect test issues that leaked into production database.
Args: Args:
@@ -164,7 +169,7 @@ class BdClientBase(ABC):
pass pass
@abstractmethod @abstractmethod
async def validate(self, checks: str | None = None, fix_all: bool = False) -> dict: async def validate(self, checks: str | None = None, fix_all: bool = False) -> dict[str, Any]:
"""Run database validation checks. """Run database validation checks.
Args: Args:
@@ -246,7 +251,7 @@ class BdCliClient(BdClientBase):
flags.append("--no-auto-import") flags.append("--no-auto-import")
return flags return flags
async def _run_command(self, *args: str, cwd: str | None = None) -> object: async def _run_command(self, *args: str, cwd: str | None = None) -> Any:
"""Run bd command and parse JSON output. """Run bd command and parse JSON output.
Args: Args:
@@ -638,7 +643,7 @@ class BdCliClient(BdClientBase):
return [BlockedIssue.model_validate(issue) for issue in data] return [BlockedIssue.model_validate(issue) for issue in data]
async def inspect_migration(self) -> dict: async def inspect_migration(self) -> dict[str, Any]:
"""Get migration plan and database state for agent analysis. """Get migration plan and database state for agent analysis.
Returns: Returns:
@@ -649,7 +654,7 @@ class BdCliClient(BdClientBase):
raise BdCommandError("Invalid response for inspect_migration") raise BdCommandError("Invalid response for inspect_migration")
return data return data
async def get_schema_info(self) -> dict: async def get_schema_info(self) -> dict[str, Any]:
"""Get current database schema for inspection. """Get current database schema for inspection.
Returns: Returns:
@@ -660,7 +665,7 @@ class BdCliClient(BdClientBase):
raise BdCommandError("Invalid response for get_schema_info") raise BdCommandError("Invalid response for get_schema_info")
return data return data
async def repair_deps(self, fix: bool = False) -> dict: async def repair_deps(self, fix: bool = False) -> dict[str, Any]:
"""Find and optionally fix orphaned dependency references. """Find and optionally fix orphaned dependency references.
Args: Args:
@@ -678,7 +683,7 @@ class BdCliClient(BdClientBase):
raise BdCommandError("Invalid response for repair-deps") raise BdCommandError("Invalid response for repair-deps")
return data return data
async def detect_pollution(self, clean: bool = False) -> dict: async def detect_pollution(self, clean: bool = False) -> dict[str, Any]:
"""Detect test issues that leaked into production database. """Detect test issues that leaked into production database.
Args: Args:
@@ -696,7 +701,7 @@ class BdCliClient(BdClientBase):
raise BdCommandError("Invalid response for detect-pollution") raise BdCommandError("Invalid response for detect-pollution")
return data return data
async def validate(self, checks: str | None = None, fix_all: bool = False) -> dict: async def validate(self, checks: str | None = None, fix_all: bool = False) -> dict[str, Any]:
"""Run database validation checks. """Run database validation checks.
Args: Args:
@@ -804,9 +809,9 @@ def create_bd_client(
current = search_dir.resolve() current = search_dir.resolve()
while True: while True:
beads_dir = current / ".beads" local_beads_dir = current / ".beads"
if beads_dir.is_dir(): if local_beads_dir.is_dir():
sock_path = beads_dir / "bd.sock" sock_path = local_beads_dir / "bd.sock"
if sock_path.exists(): if sock_path.exists():
socket_found = True socket_found = True
break break

View File

@@ -46,7 +46,7 @@ class BdDaemonClient(BdClientBase):
"""Client for calling bd daemon via RPC over Unix socket.""" """Client for calling bd daemon via RPC over Unix socket."""
socket_path: str | None socket_path: str | None
working_dir: str | None working_dir: str
actor: str | None actor: str | None
timeout: float timeout: float
@@ -113,7 +113,7 @@ class BdDaemonClient(BdClientBase):
"Daemon socket not found. Is the daemon running? Try: bd daemon (local) or bd daemon --global" "Daemon socket not found. Is the daemon running? Try: bd daemon (local) or bd daemon --global"
) )
async def _send_request(self, operation: str, args: Dict[str, Any]) -> Dict[str, Any]: async def _send_request(self, operation: str, args: Dict[str, Any]) -> Any:
"""Send RPC request to daemon and get response. """Send RPC request to daemon and get response.
Args: Args:
@@ -192,7 +192,7 @@ class BdDaemonClient(BdClientBase):
writer.close() writer.close()
await writer.wait_closed() await writer.wait_closed()
async def ping(self) -> Dict[str, str]: async def ping(self) -> Dict[str, Any]:
"""Ping daemon to check if it's running. """Ping daemon to check if it's running.
Returns: Returns:
@@ -204,7 +204,8 @@ class BdDaemonClient(BdClientBase):
DaemonError: If request fails DaemonError: If request fails
""" """
data = await self._send_request("ping", {}) data = await self._send_request("ping", {})
return json.loads(data) if isinstance(data, str) else data result = json.loads(data) if isinstance(data, str) else data
return dict(result)
async def health(self) -> Dict[str, Any]: async def health(self) -> Dict[str, Any]:
"""Get daemon health status. """Get daemon health status.
@@ -224,7 +225,24 @@ class BdDaemonClient(BdClientBase):
DaemonError: If request fails DaemonError: If request fails
""" """
data = await self._send_request("health", {}) data = await self._send_request("health", {})
return json.loads(data) if isinstance(data, str) else data result = json.loads(data) if isinstance(data, str) else data
return dict(result)
async def quickstart(self) -> str:
"""Get quickstart guide.
Note: Daemon RPC doesn't support quickstart command.
Returns static guide text pointing users to CLI.
Returns:
Quickstart guide text
"""
return (
"Beads (bd) Quickstart\n\n"
"To get started with beads, please refer to the documentation or use the CLI:\n"
" bd quickstart\n\n"
"For MCP usage, try 'beads list' or 'beads create'."
)
async def init(self, params: Optional[InitParams] = None) -> str: async def init(self, params: Optional[InitParams] = None) -> str:
"""Initialize new beads database (not typically used via daemon). """Initialize new beads database (not typically used via daemon).
@@ -256,7 +274,7 @@ class BdDaemonClient(BdClientBase):
""" """
args = { args = {
"title": params.title, "title": params.title,
"issue_type": params.issue_type or "task", "issue_type": params.issue_type,
"priority": params.priority if params.priority is not None else 2, "priority": params.priority if params.priority is not None else 2,
} }
if params.id: if params.id:
@@ -430,7 +448,7 @@ class BdDaemonClient(BdClientBase):
# This is a placeholder for when it's added # This is a placeholder for when it's added
raise NotImplementedError("Blocked operation not yet supported via daemon") raise NotImplementedError("Blocked operation not yet supported via daemon")
async def inspect_migration(self) -> dict: async def inspect_migration(self) -> dict[str, Any]:
"""Get migration plan and database state for agent analysis. """Get migration plan and database state for agent analysis.
Returns: Returns:
@@ -441,7 +459,7 @@ class BdDaemonClient(BdClientBase):
""" """
raise NotImplementedError("inspect_migration not supported via daemon - use CLI client") raise NotImplementedError("inspect_migration not supported via daemon - use CLI client")
async def get_schema_info(self) -> dict: async def get_schema_info(self) -> dict[str, Any]:
"""Get current database schema for inspection. """Get current database schema for inspection.
Returns: Returns:
@@ -452,7 +470,7 @@ class BdDaemonClient(BdClientBase):
""" """
raise NotImplementedError("get_schema_info not supported via daemon - use CLI client") raise NotImplementedError("get_schema_info not supported via daemon - use CLI client")
async def repair_deps(self, fix: bool = False) -> dict: async def repair_deps(self, fix: bool = False) -> dict[str, Any]:
"""Find and optionally fix orphaned dependency references. """Find and optionally fix orphaned dependency references.
Args: Args:
@@ -466,7 +484,7 @@ class BdDaemonClient(BdClientBase):
""" """
raise NotImplementedError("repair_deps not supported via daemon - use CLI client") raise NotImplementedError("repair_deps not supported via daemon - use CLI client")
async def detect_pollution(self, clean: bool = False) -> dict: async def detect_pollution(self, clean: bool = False) -> dict[str, Any]:
"""Detect test issues that leaked into production database. """Detect test issues that leaked into production database.
Args: Args:
@@ -480,7 +498,7 @@ class BdDaemonClient(BdClientBase):
""" """
raise NotImplementedError("detect_pollution not supported via daemon - use CLI client") raise NotImplementedError("detect_pollution not supported via daemon - use CLI client")
async def validate(self, checks: str | None = None, fix_all: bool = False) -> dict: async def validate(self, checks: str | None = None, fix_all: bool = False) -> dict[str, Any]:
"""Run database validation checks. """Run database validation checks.
Args: Args:
@@ -504,7 +522,7 @@ class BdDaemonClient(BdClientBase):
args = { args = {
"from_id": params.issue_id, "from_id": params.issue_id,
"to_id": params.depends_on_id, "to_id": params.depends_on_id,
"dep_type": params.dep_type or "blocks", "dep_type": params.dep_type,
} }
await self._send_request("dep_add", args) await self._send_request("dep_add", args)

View File

@@ -21,7 +21,7 @@ AGENT_MAIL_RETRIES = 2
class MailError(Exception): class MailError(Exception):
"""Base exception for Agent Mail errors.""" """Base exception for Agent Mail errors."""
def __init__(self, code: str, message: str, data: Optional[dict] = None): def __init__(self, code: str, message: str, data: Optional[dict[str, Any]] = None):
self.code = code self.code = code
self.message = message self.message = message
self.data = data or {} self.data = data or {}
@@ -97,9 +97,9 @@ def _get_project_key() -> str:
def _call_agent_mail( def _call_agent_mail(
method: str, method: str,
endpoint: str, endpoint: str,
json_data: Optional[dict] = None, json_data: Optional[dict[str, Any]] = None,
params: Optional[dict] = None, params: Optional[dict[str, Any]] = None,
) -> dict[str, Any]: ) -> Any:
"""Make HTTP request to Agent Mail server with retries. """Make HTTP request to Agent Mail server with retries.
Args: Args:
@@ -205,7 +205,9 @@ def _call_agent_mail(
time.sleep(0.5 * (2**attempt)) time.sleep(0.5 * (2**attempt))
# All retries exhausted # All retries exhausted
raise last_error if last_error:
raise last_error
raise MailError("INTERNAL_ERROR", "Request failed with no error details")
def mail_send( def mail_send(
@@ -350,7 +352,7 @@ def mail_inbox(
) )
# Agent Mail returns list of messages directly # Agent Mail returns list of messages directly
messages = result if isinstance(result, list) else [] messages: list[dict[str, Any]] = result if isinstance(result, list) else []
# Transform to our format and filter unread if requested # Transform to our format and filter unread if requested
formatted_messages = [] formatted_messages = []

View File

@@ -155,7 +155,7 @@ def beads_mail_reply(params: MailReplyParams) -> dict[str, Any]:
return {"error": e.code, "message": e.message, "data": e.data} return {"error": e.code, "message": e.message, "data": e.data}
def beads_mail_ack(params: MailAckParams) -> dict[str, bool]: def beads_mail_ack(params: MailAckParams) -> dict[str, Any]:
"""Acknowledge a message (for ack_required messages). """Acknowledge a message (for ack_required messages).
Safe to call even if message doesn't require acknowledgement. Safe to call even if message doesn't require acknowledgement.
@@ -183,7 +183,7 @@ def beads_mail_ack(params: MailAckParams) -> dict[str, bool]:
return {"error": e.code, "acknowledged": False, "message": e.message} return {"error": e.code, "acknowledged": False, "message": e.message}
def beads_mail_delete(params: MailDeleteParams) -> dict[str, bool]: def beads_mail_delete(params: MailDeleteParams) -> dict[str, Any]:
"""Delete (archive) a message from Agent Mail inbox. """Delete (archive) a message from Agent Mail inbox.
Note: Agent Mail archives messages rather than permanently deleting them. Note: Agent Mail archives messages rather than permanently deleting them.

View File

@@ -0,0 +1 @@

View File

@@ -9,7 +9,8 @@ import signal
import subprocess import subprocess
import sys import sys
from functools import wraps from functools import wraps
from typing import Callable, TypeVar from types import FrameType
from typing import Any, Awaitable, Callable, TypeVar
from fastmcp import FastMCP from fastmcp import FastMCP
@@ -46,7 +47,7 @@ logging.basicConfig(
T = TypeVar("T") T = TypeVar("T")
# Global state for cleanup # Global state for cleanup
_daemon_clients: list = [] _daemon_clients: list[Any] = []
_cleanup_done = False _cleanup_done = False
# Persistent workspace context (survives across MCP tool calls) # Persistent workspace context (survives across MCP tool calls)
@@ -92,7 +93,7 @@ def cleanup() -> None:
logger.info("Cleanup complete") logger.info("Cleanup complete")
def signal_handler(signum: int, frame) -> None: def signal_handler(signum: int, frame: FrameType | None) -> None:
"""Handle termination signals gracefully.""" """Handle termination signals gracefully."""
sig_name = signal.Signals(signum).name sig_name = signal.Signals(signum).name
logger.info(f"Received {sig_name}, shutting down gracefully...") logger.info(f"Received {sig_name}, shutting down gracefully...")
@@ -114,7 +115,7 @@ except importlib.metadata.PackageNotFoundError:
logger.info(f"beads-mcp v{__version__} initialized with lifecycle management") logger.info(f"beads-mcp v{__version__} initialized with lifecycle management")
def with_workspace(func: Callable[..., T]) -> Callable[..., T]: def with_workspace(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
"""Decorator to set workspace context for the duration of a tool call. """Decorator to set workspace context for the duration of a tool call.
Extracts workspace_root parameter from tool call kwargs, resolves it, Extracts workspace_root parameter from tool call kwargs, resolves it,
@@ -124,7 +125,7 @@ def with_workspace(func: Callable[..., T]) -> Callable[..., T]:
This enables per-request workspace routing for multi-project support. This enables per-request workspace routing for multi-project support.
""" """
@wraps(func) @wraps(func)
async def wrapper(*args, **kwargs): async def wrapper(*args: Any, **kwargs: Any) -> T:
# Extract workspace_root parameter (if provided) # Extract workspace_root parameter (if provided)
workspace_root = kwargs.get('workspace_root') workspace_root = kwargs.get('workspace_root')
@@ -148,7 +149,7 @@ def with_workspace(func: Callable[..., T]) -> Callable[..., T]:
return wrapper return wrapper
def require_context(func: Callable[..., T]) -> Callable[..., T]: def require_context(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
"""Decorator to enforce context has been set before write operations. """Decorator to enforce context has been set before write operations.
Passes if either: Passes if either:
@@ -159,7 +160,7 @@ def require_context(func: Callable[..., T]) -> Callable[..., T]:
This allows backward compatibility while adding safety for multi-repo setups. This allows backward compatibility while adding safety for multi-repo setups.
""" """
@wraps(func) @wraps(func)
async def wrapper(*args, **kwargs): async def wrapper(*args: Any, **kwargs: Any) -> T:
# Only enforce if explicitly enabled # Only enforce if explicitly enabled
if os.environ.get("BEADS_REQUIRE_CONTEXT") == "1": if os.environ.get("BEADS_REQUIRE_CONTEXT") == "1":
# Check ContextVar or environment # Check ContextVar or environment
@@ -453,7 +454,7 @@ async def update_issue(
notes: str | None = None, notes: str | None = None,
external_ref: str | None = None, external_ref: str | None = None,
workspace_root: str | None = None, workspace_root: str | None = None,
) -> Issue: ) -> Issue | list[Issue] | None:
"""Update an existing issue.""" """Update an existing issue."""
# If trying to close via update, redirect to close_issue to preserve approval workflow # If trying to close via update, redirect to close_issue to preserve approval workflow
if status == "closed": if status == "closed":
@@ -577,7 +578,7 @@ async def debug_env(workspace_root: str | None = None) -> str:
description="Get migration plan and database state for agent analysis.", description="Get migration plan and database state for agent analysis.",
) )
@with_workspace @with_workspace
async def inspect_migration(workspace_root: str | None = None) -> dict: async def inspect_migration(workspace_root: str | None = None) -> dict[str, Any]:
"""Get migration plan and database state for agent analysis. """Get migration plan and database state for agent analysis.
AI agents should: AI agents should:
@@ -596,7 +597,7 @@ async def inspect_migration(workspace_root: str | None = None) -> dict:
description="Get current database schema for inspection.", description="Get current database schema for inspection.",
) )
@with_workspace @with_workspace
async def get_schema_info(workspace_root: str | None = None) -> dict: async def get_schema_info(workspace_root: str | None = None) -> dict[str, Any]:
"""Get current database schema for inspection. """Get current database schema for inspection.
Returns tables, schema version, config, sample issue IDs, and detected prefix. Returns tables, schema version, config, sample issue IDs, and detected prefix.
@@ -610,7 +611,7 @@ async def get_schema_info(workspace_root: str | None = None) -> dict:
description="Find and optionally fix orphaned dependency references.", description="Find and optionally fix orphaned dependency references.",
) )
@with_workspace @with_workspace
async def repair_deps(fix: bool = False, workspace_root: str | None = None) -> dict: async def repair_deps(fix: bool = False, workspace_root: str | None = None) -> dict[str, Any]:
"""Find and optionally fix orphaned dependency references. """Find and optionally fix orphaned dependency references.
Scans all issues for dependencies pointing to non-existent issues. Scans all issues for dependencies pointing to non-existent issues.
@@ -624,7 +625,7 @@ async def repair_deps(fix: bool = False, workspace_root: str | None = None) -> d
description="Detect test issues that leaked into production database.", description="Detect test issues that leaked into production database.",
) )
@with_workspace @with_workspace
async def detect_pollution(clean: bool = False, workspace_root: str | None = None) -> dict: async def detect_pollution(clean: bool = False, workspace_root: str | None = None) -> dict[str, Any]:
"""Detect test issues that leaked into production database. """Detect test issues that leaked into production database.
Detects test issues using pattern matching (titles starting with 'test', etc.). Detects test issues using pattern matching (titles starting with 'test', etc.).
@@ -642,7 +643,7 @@ async def validate(
checks: str | None = None, checks: str | None = None,
fix_all: bool = False, fix_all: bool = False,
workspace_root: str | None = None, workspace_root: str | None = None,
) -> dict: ) -> dict[str, Any]:
"""Run comprehensive database health checks. """Run comprehensive database health checks.
Available checks: orphans, duplicates, pollution, conflicts. Available checks: orphans, duplicates, pollution, conflicts.

View File

@@ -7,7 +7,7 @@ import subprocess
import sys import sys
from contextvars import ContextVar from contextvars import ContextVar
from functools import lru_cache from functools import lru_cache
from typing import Annotated, TYPE_CHECKING from typing import Annotated, Any, TYPE_CHECKING
from .bd_client import create_bd_client, BdClientBase, BdError from .bd_client import create_bd_client, BdClientBase, BdError
@@ -516,7 +516,7 @@ async def beads_blocked() -> list[BlockedIssue]:
return await client.blocked() return await client.blocked()
async def beads_inspect_migration() -> dict: async def beads_inspect_migration() -> dict[str, Any]:
"""Get migration plan and database state for agent analysis. """Get migration plan and database state for agent analysis.
AI agents should: AI agents should:
@@ -531,7 +531,7 @@ async def beads_inspect_migration() -> dict:
return await client.inspect_migration() return await client.inspect_migration()
async def beads_get_schema_info() -> dict: async def beads_get_schema_info() -> dict[str, Any]:
"""Get current database schema for inspection. """Get current database schema for inspection.
Returns tables, schema version, config, sample issue IDs, and detected prefix. Returns tables, schema version, config, sample issue IDs, and detected prefix.
@@ -543,7 +543,7 @@ async def beads_get_schema_info() -> dict:
async def beads_repair_deps( async def beads_repair_deps(
fix: Annotated[bool, "If True, automatically remove orphaned dependencies"] = False, fix: Annotated[bool, "If True, automatically remove orphaned dependencies"] = False,
) -> dict: ) -> dict[str, Any]:
"""Find and optionally fix orphaned dependency references. """Find and optionally fix orphaned dependency references.
Scans all issues for dependencies pointing to non-existent issues. Scans all issues for dependencies pointing to non-existent issues.
@@ -560,7 +560,7 @@ async def beads_repair_deps(
async def beads_detect_pollution( async def beads_detect_pollution(
clean: Annotated[bool, "If True, delete detected test issues"] = False, clean: Annotated[bool, "If True, delete detected test issues"] = False,
) -> dict: ) -> dict[str, Any]:
"""Detect test issues that leaked into production database. """Detect test issues that leaked into production database.
Detects test issues using pattern matching: Detects test issues using pattern matching:
@@ -578,7 +578,7 @@ async def beads_detect_pollution(
async def beads_validate( async def beads_validate(
checks: Annotated[str | None, "Comma-separated list of checks (orphans,duplicates,pollution,conflicts)"] = None, checks: Annotated[str | None, "Comma-separated list of checks (orphans,duplicates,pollution,conflicts)"] = None,
fix_all: Annotated[bool, "If True, auto-fix all fixable issues"] = False, fix_all: Annotated[bool, "If True, auto-fix all fixable issues"] = False,
) -> dict: ) -> dict[str, Any]:
"""Run comprehensive database health checks. """Run comprehensive database health checks.
Available checks: Available checks:

View File

@@ -82,6 +82,7 @@ dev = [
{ name = "pytest-asyncio" }, { name = "pytest-asyncio" },
{ name = "pytest-cov" }, { name = "pytest-cov" },
{ name = "ruff" }, { name = "ruff" },
{ name = "types-requests" },
] ]
[package.metadata] [package.metadata]
@@ -98,6 +99,7 @@ dev = [
{ name = "pytest-asyncio", specifier = ">=1.2.0" }, { name = "pytest-asyncio", specifier = ">=1.2.0" },
{ name = "pytest-cov", specifier = ">=7.0.0" }, { name = "pytest-cov", specifier = ">=7.0.0" },
{ name = "ruff", specifier = ">=0.14.0" }, { name = "ruff", specifier = ">=0.14.0" },
{ name = "types-requests", specifier = ">=2.31.0" },
] ]
[[package]] [[package]]
@@ -1637,6 +1639,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/77/b8/0135fadc89e73be292b473cb820b4f5a08197779206b33191e801feeae40/tomli-2.3.0-py3-none-any.whl", hash = "sha256:e95b1af3c5b07d9e643909b5abbec77cd9f1217e6d0bca72b0234736b9fb1f1b", size = 14408 }, { url = "https://files.pythonhosted.org/packages/77/b8/0135fadc89e73be292b473cb820b4f5a08197779206b33191e801feeae40/tomli-2.3.0-py3-none-any.whl", hash = "sha256:e95b1af3c5b07d9e643909b5abbec77cd9f1217e6d0bca72b0234736b9fb1f1b", size = 14408 },
] ]
[[package]]
name = "types-requests"
version = "2.32.4.20250913"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/36/27/489922f4505975b11de2b5ad07b4fe1dca0bca9be81a703f26c5f3acfce5/types_requests-2.32.4.20250913.tar.gz", hash = "sha256:abd6d4f9ce3a9383f269775a9835a4c24e5cd6b9f647d64f88aa4613c33def5d", size = 23113 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/20/9a227ea57c1285986c4cf78400d0a91615d25b24e257fd9e2969606bdfae/types_requests-2.32.4.20250913-py3-none-any.whl", hash = "sha256:78c9c1fffebbe0fa487a418e0fa5252017e9c60d1a2da394077f1780f655d7e1", size = 20658 },
]
[[package]] [[package]]
name = "typing-extensions" name = "typing-extensions"
version = "4.15.0" version = "4.15.0"

View File

@@ -0,0 +1,71 @@
package sqlite
import (
"context"
"database/sql"
"fmt"
)
// execer is an interface for types that can execute SQL queries
// Both *sql.DB and *sql.Tx implement this interface
type execer interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// rebuildBlockedCache completely rebuilds the blocked_issues_cache table
// This is used during cache invalidation when dependencies change
func (s *SQLiteStorage) rebuildBlockedCache(ctx context.Context, tx *sql.Tx) error {
// Use the transaction if provided, otherwise use direct db connection
var exec execer = s.db
if tx != nil {
exec = tx
}
// Clear the cache
if _, err := exec.ExecContext(ctx, "DELETE FROM blocked_issues_cache"); err != nil {
return fmt.Errorf("failed to clear blocked_issues_cache: %w", err)
}
// Rebuild using the recursive CTE logic
query := `
INSERT INTO blocked_issues_cache (issue_id)
WITH RECURSIVE
-- Step 1: Find issues blocked directly by dependencies
blocked_directly AS (
SELECT DISTINCT d.issue_id
FROM dependencies d
JOIN issues blocker ON d.depends_on_id = blocker.id
WHERE d.type = 'blocks'
AND blocker.status IN ('open', 'in_progress', 'blocked')
),
-- Step 2: Propagate blockage to all descendants via parent-child
blocked_transitively AS (
-- Base case: directly blocked issues
SELECT issue_id, 0 as depth
FROM blocked_directly
UNION ALL
-- Recursive case: children of blocked issues inherit blockage
SELECT d.issue_id, bt.depth + 1
FROM blocked_transitively bt
JOIN dependencies d ON d.depends_on_id = bt.issue_id
WHERE d.type = 'parent-child'
AND bt.depth < 50
)
SELECT DISTINCT issue_id FROM blocked_transitively
`
if _, err := exec.ExecContext(ctx, query); err != nil {
return fmt.Errorf("failed to rebuild blocked_issues_cache: %w", err)
}
return nil
}
// invalidateBlockedCache rebuilds the blocked issues cache
// Called when dependencies change or issue status changes
func (s *SQLiteStorage) invalidateBlockedCache(ctx context.Context, tx *sql.Tx) error {
return s.rebuildBlockedCache(ctx, tx)
}

View File

@@ -150,6 +150,14 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
return wrapDBError("mark issues dirty after adding dependency", err) return wrapDBError("mark issues dirty after adding dependency", err)
} }
// Invalidate blocked issues cache since dependencies changed (bd-5qim)
// Only invalidate for 'blocks' and 'parent-child' types since they affect blocking
if dep.Type == types.DepBlocks || dep.Type == types.DepParentChild {
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
}
}
return nil return nil
}) })
} }
@@ -157,6 +165,18 @@ func (s *SQLiteStorage) AddDependency(ctx context.Context, dep *types.Dependency
// RemoveDependency removes a dependency // RemoveDependency removes a dependency
func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error { func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOnID string, actor string) error {
return s.withTx(ctx, func(tx *sql.Tx) error { return s.withTx(ctx, func(tx *sql.Tx) error {
// First, check what type of dependency is being removed
var depType types.DependencyType
err := tx.QueryRowContext(ctx, `
SELECT type FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
`, issueID, dependsOnID).Scan(&depType)
// Store whether cache needs invalidation before deletion
needsCacheInvalidation := false
if err == nil {
needsCacheInvalidation = (depType == types.DepBlocks || depType == types.DepParentChild)
}
result, err := tx.ExecContext(ctx, ` result, err := tx.ExecContext(ctx, `
DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ? DELETE FROM dependencies WHERE issue_id = ? AND depends_on_id = ?
`, issueID, dependsOnID) `, issueID, dependsOnID)
@@ -187,6 +207,13 @@ func (s *SQLiteStorage) RemoveDependency(ctx context.Context, issueID, dependsOn
return wrapDBError("mark issues dirty after removing dependency", err) return wrapDBError("mark issues dirty after removing dependency", err)
} }
// Invalidate blocked issues cache if this was a blocking dependency (bd-5qim)
if needsCacheInvalidation {
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
}
}
return nil return nil
}) })
} }

View File

@@ -31,6 +31,7 @@ var migrationsList = []Migration{
{"source_repo_column", migrations.MigrateSourceRepoColumn}, {"source_repo_column", migrations.MigrateSourceRepoColumn},
{"repo_mtimes_table", migrations.MigrateRepoMtimesTable}, {"repo_mtimes_table", migrations.MigrateRepoMtimesTable},
{"child_counters_table", migrations.MigrateChildCountersTable}, {"child_counters_table", migrations.MigrateChildCountersTable},
{"blocked_issues_cache", migrations.MigrateBlockedIssuesCache},
} }
// MigrationInfo contains metadata about a migration for inspection // MigrationInfo contains metadata about a migration for inspection
@@ -69,6 +70,7 @@ func getMigrationDescription(name string) string {
"source_repo_column": "Adds source_repo column for multi-repo support", "source_repo_column": "Adds source_repo column for multi-repo support",
"repo_mtimes_table": "Adds repo_mtimes table for multi-repo hydration caching", "repo_mtimes_table": "Adds repo_mtimes table for multi-repo hydration caching",
"child_counters_table": "Adds child_counters table for hierarchical ID generation with ON DELETE CASCADE", "child_counters_table": "Adds child_counters table for hierarchical ID generation with ON DELETE CASCADE",
"blocked_issues_cache": "Adds blocked_issues_cache table for GetReadyWork performance optimization (bd-5qim)",
} }
if desc, ok := descriptions[name]; ok { if desc, ok := descriptions[name]; ok {

View File

@@ -0,0 +1,75 @@
package migrations
import (
"database/sql"
"fmt"
)
// MigrateBlockedIssuesCache creates the blocked_issues_cache table for performance optimization
// This cache materializes the recursive CTE computation from GetReadyWork to avoid
// expensive recursive queries on every call (bd-5qim)
func MigrateBlockedIssuesCache(db *sql.DB) error {
// Check if table already exists
var tableName string
err := db.QueryRow(`
SELECT name FROM sqlite_master
WHERE type='table' AND name='blocked_issues_cache'
`).Scan(&tableName)
if err == sql.ErrNoRows {
// Create the cache table
_, err := db.Exec(`
CREATE TABLE blocked_issues_cache (
issue_id TEXT NOT NULL,
PRIMARY KEY (issue_id),
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE
)
`)
if err != nil {
return fmt.Errorf("failed to create blocked_issues_cache table: %w", err)
}
// Populate the cache with initial data using the existing recursive CTE logic
_, err = db.Exec(`
INSERT INTO blocked_issues_cache (issue_id)
WITH RECURSIVE
-- Step 1: Find issues blocked directly by dependencies
blocked_directly AS (
SELECT DISTINCT d.issue_id
FROM dependencies d
JOIN issues blocker ON d.depends_on_id = blocker.id
WHERE d.type = 'blocks'
AND blocker.status IN ('open', 'in_progress', 'blocked')
),
-- Step 2: Propagate blockage to all descendants via parent-child
blocked_transitively AS (
-- Base case: directly blocked issues
SELECT issue_id, 0 as depth
FROM blocked_directly
UNION ALL
-- Recursive case: children of blocked issues inherit blockage
SELECT d.issue_id, bt.depth + 1
FROM blocked_transitively bt
JOIN dependencies d ON d.depends_on_id = bt.issue_id
WHERE d.type = 'parent-child'
AND bt.depth < 50
)
SELECT DISTINCT issue_id FROM blocked_transitively
`)
if err != nil {
return fmt.Errorf("failed to populate blocked_issues_cache: %w", err)
}
return nil
}
if err != nil {
return fmt.Errorf("failed to check for blocked_issues_cache table: %w", err)
}
// Table already exists
return nil
}

View File

@@ -81,47 +81,17 @@ func (s *SQLiteStorage) GetReadyWork(ctx context.Context, filter types.WorkFilte
} }
orderBySQL := buildOrderByClause(sortPolicy) orderBySQL := buildOrderByClause(sortPolicy)
// Query with recursive CTE to propagate blocking through parent-child hierarchy // Use blocked_issues_cache for performance (bd-5qim)
// Algorithm: // Cache is maintained by invalidateBlockedCache() called on dependency/status changes
// 1. Find issues directly blocked by 'blocks' dependencies
// 2. Recursively propagate blockage to all descendants via 'parent-child' links
// 3. Exclude all blocked issues (both direct and transitive) from ready work
// #nosec G201 - safe SQL with controlled formatting // #nosec G201 - safe SQL with controlled formatting
query := fmt.Sprintf(` query := fmt.Sprintf(`
WITH RECURSIVE
-- Step 1: Find issues blocked directly by dependencies
blocked_directly AS (
SELECT DISTINCT d.issue_id
FROM dependencies d
JOIN issues blocker ON d.depends_on_id = blocker.id
WHERE d.type = 'blocks'
AND blocker.status IN ('open', 'in_progress', 'blocked')
),
-- Step 2: Propagate blockage to all descendants via parent-child
blocked_transitively AS (
-- Base case: directly blocked issues
SELECT issue_id, 0 as depth
FROM blocked_directly
UNION ALL
-- Recursive case: children of blocked issues inherit blockage
SELECT d.issue_id, bt.depth + 1
FROM blocked_transitively bt
JOIN dependencies d ON d.depends_on_id = bt.issue_id
WHERE d.type = 'parent-child'
AND bt.depth < 50
)
-- Step 3: Select ready issues (excluding all blocked)
SELECT i.id, i.content_hash, i.title, i.description, i.design, i.acceptance_criteria, i.notes, SELECT i.id, i.content_hash, i.title, i.description, i.design, i.acceptance_criteria, i.notes,
i.status, i.priority, i.issue_type, i.assignee, i.estimated_minutes, i.status, i.priority, i.issue_type, i.assignee, i.estimated_minutes,
i.created_at, i.updated_at, i.closed_at, i.external_ref, i.source_repo i.created_at, i.updated_at, i.closed_at, i.external_ref, i.source_repo
FROM issues i FROM issues i
WHERE %s WHERE %s
AND NOT EXISTS ( AND NOT EXISTS (
SELECT 1 FROM blocked_transitively WHERE issue_id = i.id SELECT 1 FROM blocked_issues_cache WHERE issue_id = i.id
) )
%s %s
%s %s

View File

@@ -694,6 +694,14 @@ func (s *SQLiteStorage) UpdateIssue(ctx context.Context, id string, updates map[
return fmt.Errorf("failed to mark issue dirty: %w", err) return fmt.Errorf("failed to mark issue dirty: %w", err)
} }
// Invalidate blocked issues cache if status changed (bd-5qim)
// Status changes affect which issues are blocked (blockers must be open/in_progress/blocked)
if _, statusChanged := updates["status"]; statusChanged {
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
}
}
return tx.Commit() return tx.Commit()
} }
@@ -861,6 +869,12 @@ func (s *SQLiteStorage) CloseIssue(ctx context.Context, id string, reason string
return fmt.Errorf("failed to mark issue dirty: %w", err) return fmt.Errorf("failed to mark issue dirty: %w", err)
} }
// Invalidate blocked issues cache since status changed to closed (bd-5qim)
// Closed issues don't block others, so this affects blocking calculations
if err := s.invalidateBlockedCache(ctx, tx); err != nil {
return fmt.Errorf("failed to invalidate blocked cache: %w", err)
}
return tx.Commit() return tx.Commit()
} }