feat(mcp): consolidate admin tools to reduce token overhead (#634)

Consolidate 6 admin MCP tools into single 'admin' tool with action parameter for 79% token reduction.
This commit is contained in:
Aarya Reddy
2025-12-18 18:32:54 -08:00
committed by GitHub
parent 1f4b55dacb
commit efdfbac504

View File

@@ -319,6 +319,7 @@ _TOOL_CATALOG = {
"init": "Initialize beads in a directory",
"set_context": "Set workspace root for operations",
"where_am_i": "Show current workspace context",
"admin": "Administrative/diagnostic operations (validate, repair, schema, debug, migration, pollution)",
"discover_tools": "List available tools (names only)",
"get_tool_info": "Get detailed info for a specific tool",
}
@@ -472,6 +473,20 @@ async def get_tool_info(tool_name: str) -> dict[str, Any]:
"returns": "List of blocked issues with blocker info",
"example": "blocked()"
},
"admin": {
"name": "admin",
"description": "Administrative and diagnostic operations",
"parameters": {
"action": "str (required) - validate|repair|schema|debug|migration|pollution",
"checks": "str (optional) - For validate: orphans,duplicates,pollution,conflicts",
"fix_all": "bool (default false) - For validate: auto-fix issues",
"fix": "bool (default false) - For repair: apply fixes",
"clean": "bool (default false) - For pollution: delete test issues",
"workspace_root": "str (optional)"
},
"returns": "Dict with operation results (or string for debug)",
"example": "admin(action='validate', checks='orphans')"
},
}
if tool_name not in tool_details:
@@ -843,106 +858,56 @@ async def init(prefix: str | None = None, workspace_root: str | None = None) ->
@mcp.tool(
name="debug_env",
description="Debug tool: Show environment and working directory information",
name="admin",
description="""Administrative and diagnostic operations.
Actions:
- validate: Run database health checks (checks=orphans,duplicates,pollution,conflicts)
- repair: Fix orphaned dependency references (fix=True to apply)
- schema: Show database schema info
- debug: Show environment and working directory info
- migration: Get migration plan and database state
- pollution: Detect/clean test issues (clean=True to delete)""",
)
@with_workspace
async def debug_env(workspace_root: str | None = None) -> str:
"""Debug tool to check working directory and environment variables."""
info = []
info.append("=== Working Directory Debug Info ===\n")
info.append(f"os.getcwd(): {os.getcwd()}\n")
info.append(f"PWD env var: {os.environ.get('PWD', 'NOT SET')}\n")
info.append(f"BEADS_WORKING_DIR env var: {os.environ.get('BEADS_WORKING_DIR', 'NOT SET')}\n")
info.append(f"BEADS_PATH env var: {os.environ.get('BEADS_PATH', 'NOT SET')}\n")
info.append(f"BEADS_DB env var: {os.environ.get('BEADS_DB', 'NOT SET')}\n")
info.append(f"HOME: {os.environ.get('HOME', 'NOT SET')}\n")
info.append(f"USER: {os.environ.get('USER', 'NOT SET')}\n")
info.append("\n=== All Environment Variables ===\n")
for key, value in sorted(os.environ.items()):
if not key.startswith("_"): # Skip internal vars
info.append(f"{key}={value}\n")
return "".join(info)
@mcp.tool(
name="inspect_migration",
description="Get migration plan and database state for agent analysis.",
)
@with_workspace
async def inspect_migration(workspace_root: str | None = None) -> dict[str, Any]:
"""Get migration plan and database state for agent analysis.
AI agents should:
1. Review registered_migrations to understand what will run
2. Check warnings array for issues (missing config, version mismatch)
3. Verify missing_config is empty before migrating
4. Check invariants_to_check to understand safety guarantees
Returns migration plan, current db state, warnings, and invariants.
"""
return await beads_inspect_migration()
@mcp.tool(
name="get_schema_info",
description="Get current database schema for inspection.",
)
@with_workspace
async def get_schema_info(workspace_root: str | None = None) -> dict[str, Any]:
"""Get current database schema for inspection.
Returns tables, schema version, config, sample issue IDs, and detected prefix.
Useful for verifying database state before migrations.
"""
return await beads_get_schema_info()
@mcp.tool(
name="repair_deps",
description="Find and optionally fix orphaned dependency references.",
)
@with_workspace
async def repair_deps(fix: bool = False, workspace_root: str | None = None) -> dict[str, Any]:
"""Find and optionally fix orphaned dependency references.
Scans all issues for dependencies pointing to non-existent issues.
Returns orphaned dependencies and optionally removes them with fix=True.
"""
return await beads_repair_deps(fix=fix)
@mcp.tool(
name="detect_pollution",
description="Detect test issues that leaked into production database.",
)
@with_workspace
async def detect_pollution(clean: bool = False, workspace_root: str | None = None) -> dict[str, Any]:
"""Detect test issues that leaked into production database.
Detects test issues using pattern matching (titles starting with 'test', etc.).
Returns detected test issues and optionally deletes them with clean=True.
"""
return await beads_detect_pollution(clean=clean)
@mcp.tool(
name="validate",
description="Run comprehensive database health checks.",
)
@with_workspace
async def validate(
async def admin(
action: str, # validate, repair, schema, debug, migration, pollution
checks: str | None = None,
fix_all: bool = False,
fix: bool = False,
clean: bool = False,
workspace_root: str | None = None,
) -> dict[str, Any]:
"""Run comprehensive database health checks.
Available checks: orphans, duplicates, pollution, conflicts.
If checks is None, runs all checks.
Returns validation results for each check.
"""
return await beads_validate(checks=checks, fix_all=fix_all)
) -> dict[str, Any] | str:
"""Administrative and diagnostic operations."""
if action == "validate":
return await beads_validate(checks=checks, fix_all=fix_all)
elif action == "repair":
return await beads_repair_deps(fix=fix)
elif action == "schema":
return await beads_get_schema_info()
elif action == "debug":
info = []
info.append("=== Working Directory Debug Info ===\n")
info.append(f"os.getcwd(): {os.getcwd()}\n")
info.append(f"PWD env var: {os.environ.get('PWD', 'NOT SET')}\n")
info.append(f"BEADS_WORKING_DIR env var: {os.environ.get('BEADS_WORKING_DIR', 'NOT SET')}\n")
info.append(f"BEADS_PATH env var: {os.environ.get('BEADS_PATH', 'NOT SET')}\n")
info.append(f"BEADS_DB env var: {os.environ.get('BEADS_DB', 'NOT SET')}\n")
info.append(f"HOME: {os.environ.get('HOME', 'NOT SET')}\n")
info.append(f"USER: {os.environ.get('USER', 'NOT SET')}\n")
return "".join(info)
elif action == "migration":
return await beads_inspect_migration()
elif action == "pollution":
return await beads_detect_pollution(clean=clean)
else:
raise ValueError(f"Unknown action: {action}. Use 'validate', 'repair', 'schema', 'debug', 'migration', or 'pollution'")
async def async_main() -> None: