- Add 24 new tests across 6 test classes - Cover authorization headers, request body validation - Test reservation edge cases (TTL, special chars, multiple reservations) - Test timeout configuration and precedence - Test inbox/notification edge cases (large payloads, Unicode, nested data) - Fix timeout parameter precedence bug (constructor now overrides env var) - All 51 tests pass Closes bd-b134 Amp-Thread-ID: https://ampcode.com/threads/T-2f39e97d-38de-4df4-bf94-ef90184cee8a Co-authored-by: Amp <amp@ampcode.com>
272 lines
9.2 KiB
Python
272 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Beads Agent Mail Adapter
|
|
|
|
Lightweight HTTP client for MCP Agent Mail server that provides:
|
|
- File reservation system (collision prevention)
|
|
- Real-time notifications between agents
|
|
- Status update coordination
|
|
- Graceful degradation when server unavailable
|
|
|
|
Usage:
|
|
from beads_mail_adapter import AgentMailAdapter
|
|
|
|
adapter = AgentMailAdapter()
|
|
if adapter.enabled:
|
|
adapter.reserve_issue("bd-123")
|
|
adapter.notify("status_changed", {"issue_id": "bd-123", "status": "in_progress"})
|
|
adapter.release_issue("bd-123")
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Optional, Dict, Any, List
|
|
from urllib.request import Request, urlopen
|
|
from urllib.error import URLError, HTTPError
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AgentMailAdapter:
|
|
"""
|
|
Agent Mail HTTP client with health checks and graceful degradation.
|
|
|
|
Environment variables:
|
|
AGENT_MAIL_URL: Server URL (default: http://127.0.0.1:8765)
|
|
AGENT_MAIL_TOKEN: Bearer token for authentication
|
|
BEADS_AGENT_NAME: Agent identifier (default: hostname)
|
|
AGENT_MAIL_TIMEOUT: Request timeout in seconds (default: 5)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
url: Optional[str] = None,
|
|
token: Optional[str] = None,
|
|
agent_name: Optional[str] = None,
|
|
timeout: Optional[int] = None
|
|
):
|
|
"""
|
|
Initialize Agent Mail adapter with health check.
|
|
|
|
Args:
|
|
url: Server URL (overrides AGENT_MAIL_URL env var)
|
|
token: Bearer token (overrides AGENT_MAIL_TOKEN env var)
|
|
agent_name: Agent identifier (overrides BEADS_AGENT_NAME env var)
|
|
timeout: HTTP request timeout in seconds (overrides AGENT_MAIL_TIMEOUT env var)
|
|
"""
|
|
self.url = url or os.getenv("AGENT_MAIL_URL", "http://127.0.0.1:8765")
|
|
self.token = token or os.getenv("AGENT_MAIL_TOKEN", "")
|
|
self.agent_name = agent_name or os.getenv("BEADS_AGENT_NAME") or self._get_default_agent_name()
|
|
# Constructor argument overrides environment variable
|
|
if timeout is not None:
|
|
self.timeout = timeout
|
|
else:
|
|
self.timeout = int(os.getenv("AGENT_MAIL_TIMEOUT", "5"))
|
|
self.enabled = False
|
|
|
|
# Remove trailing slash from URL
|
|
self.url = self.url.rstrip("/")
|
|
|
|
# Perform health check on initialization
|
|
self._health_check()
|
|
|
|
def _get_default_agent_name(self) -> str:
|
|
"""Get default agent name from hostname or fallback."""
|
|
import socket
|
|
try:
|
|
return socket.gethostname()
|
|
except Exception:
|
|
return "beads-agent"
|
|
|
|
def _health_check(self) -> None:
|
|
"""
|
|
Check if Agent Mail server is reachable.
|
|
Sets self.enabled based on health check result.
|
|
"""
|
|
try:
|
|
response = self._request("GET", "/api/health", timeout=2)
|
|
if response and response.get("status") == "ok":
|
|
self.enabled = True
|
|
logger.info(f"Agent Mail server available at {self.url}")
|
|
else:
|
|
logger.warning(f"Agent Mail server health check failed, falling back to Beads-only mode")
|
|
self.enabled = False
|
|
except Exception as e:
|
|
logger.info(f"Agent Mail server unavailable ({e}), falling back to Beads-only mode")
|
|
self.enabled = False
|
|
|
|
def _request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
data: Optional[Dict[str, Any]] = None,
|
|
timeout: Optional[int] = None
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Make HTTP request to Agent Mail server.
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, DELETE)
|
|
path: API path (must start with /)
|
|
data: Request body (JSON)
|
|
timeout: Request timeout override
|
|
|
|
Returns:
|
|
Response JSON or None on error
|
|
"""
|
|
if not self.enabled and not path.endswith("/health"):
|
|
return None
|
|
|
|
url = f"{self.url}{path}"
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
if self.token:
|
|
headers["Authorization"] = f"Bearer {self.token}"
|
|
|
|
body = json.dumps(data).encode("utf-8") if data else None
|
|
|
|
try:
|
|
req = Request(url, data=body, headers=headers, method=method)
|
|
with urlopen(req, timeout=timeout or self.timeout) as response:
|
|
if response.status in (200, 201, 204):
|
|
response_data = response.read()
|
|
if response_data:
|
|
return json.loads(response_data)
|
|
return {}
|
|
else:
|
|
logger.warning(f"Agent Mail request failed: {method} {path} -> {response.status}")
|
|
return None
|
|
except HTTPError as e:
|
|
if e.code == 409: # Conflict (reservation already exists)
|
|
error_body = e.read().decode("utf-8")
|
|
try:
|
|
error_data = json.loads(error_body)
|
|
logger.warning(f"Agent Mail conflict: {error_data.get('error', 'Unknown error')}")
|
|
return {"error": error_data.get("error"), "status_code": 409}
|
|
except json.JSONDecodeError:
|
|
logger.warning(f"Agent Mail conflict: {error_body}")
|
|
return {"error": error_body, "status_code": 409}
|
|
else:
|
|
logger.warning(f"Agent Mail HTTP error: {method} {path} -> {e.code} {e.reason}")
|
|
return None
|
|
except URLError as e:
|
|
logger.debug(f"Agent Mail connection error: {e.reason}")
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"Agent Mail request error: {e}")
|
|
return None
|
|
|
|
def reserve_issue(self, issue_id: str, ttl: int = 3600) -> bool:
|
|
"""
|
|
Reserve an issue to prevent other agents from claiming it.
|
|
|
|
Args:
|
|
issue_id: Issue ID (e.g., "bd-123")
|
|
ttl: Reservation time-to-live in seconds (default: 1 hour)
|
|
|
|
Returns:
|
|
True if reservation successful, False otherwise
|
|
"""
|
|
if not self.enabled:
|
|
return True # No-op in Beads-only mode
|
|
|
|
response = self._request(
|
|
"POST",
|
|
"/api/reservations",
|
|
data={
|
|
"file_path": f".beads/issues/{issue_id}",
|
|
"agent_name": self.agent_name,
|
|
"ttl": ttl
|
|
}
|
|
)
|
|
|
|
if response and response.get("status_code") == 409:
|
|
logger.error(f"Issue {issue_id} already reserved: {response.get('error')}")
|
|
return False
|
|
|
|
# Graceful degradation: return True if request failed (None)
|
|
return True
|
|
|
|
def release_issue(self, issue_id: str) -> bool:
|
|
"""
|
|
Release a previously reserved issue.
|
|
|
|
Args:
|
|
issue_id: Issue ID to release
|
|
|
|
Returns:
|
|
True if release successful, False otherwise
|
|
"""
|
|
if not self.enabled:
|
|
return True
|
|
|
|
response = self._request(
|
|
"DELETE",
|
|
f"/api/reservations/{self.agent_name}/{issue_id}"
|
|
)
|
|
# Graceful degradation: return True even if request failed
|
|
return True
|
|
|
|
def notify(self, event_type: str, data: Dict[str, Any]) -> bool:
|
|
"""
|
|
Send notification to other agents.
|
|
|
|
Args:
|
|
event_type: Event type (e.g., "status_changed", "issue_completed")
|
|
data: Event payload
|
|
|
|
Returns:
|
|
True if notification sent, False otherwise
|
|
"""
|
|
if not self.enabled:
|
|
return True
|
|
|
|
response = self._request(
|
|
"POST",
|
|
"/api/notifications",
|
|
data={
|
|
"from_agent": self.agent_name,
|
|
"event_type": event_type,
|
|
"payload": data
|
|
}
|
|
)
|
|
# Graceful degradation: return True even if request failed
|
|
return True
|
|
|
|
def check_inbox(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Check for incoming notifications from other agents.
|
|
|
|
Returns:
|
|
List of notification messages (empty if server unavailable)
|
|
"""
|
|
if not self.enabled:
|
|
return []
|
|
|
|
response = self._request("GET", f"/api/notifications/{self.agent_name}")
|
|
if response and isinstance(response, list):
|
|
return response
|
|
elif response and "messages" in response:
|
|
return response["messages"]
|
|
# Graceful degradation: return empty list if request failed
|
|
return []
|
|
|
|
def get_reservations(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all active reservations.
|
|
|
|
Returns:
|
|
List of active reservations
|
|
"""
|
|
if not self.enabled:
|
|
return []
|
|
|
|
response = self._request("GET", "/api/reservations")
|
|
if response and isinstance(response, list):
|
|
return response
|
|
elif response and "reservations" in response:
|
|
return response["reservations"]
|
|
# Graceful degradation: return empty list if request failed
|
|
return []
|