feat(agent-mail): Add Python adapter library for Agent Mail integration
- Created lib/beads_mail_adapter.py with AgentMailAdapter class - Automatic health check on initialization - Graceful degradation when server unavailable - Methods: reserve_issue(), release_issue(), notify(), check_inbox() - Environment-based configuration (AGENT_MAIL_URL, AGENT_MAIL_TOKEN) - Comprehensive unit tests (15 tests, 100% passing) - Full documentation in lib/README.md Closes bd-m9th Amp-Thread-ID: https://ampcode.com/threads/T-caa26228-0d18-4b35-b98a-9f95f6a099fe Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
173
lib/README.md
Normal file
173
lib/README.md
Normal file
@@ -0,0 +1,173 @@
|
||||
# Beads Agent Mail Adapter
|
||||
|
||||
Lightweight Python library for integrating [MCP Agent Mail](https://github.com/Dicklesworthstone/mcp_agent_mail) with Beads issue tracking.
|
||||
|
||||
## Features
|
||||
|
||||
- **Collision Prevention**: Reserve issues to prevent duplicate work across agents
|
||||
- **Real-Time Coordination**: <100ms latency vs 2-5s with git-only sync
|
||||
- **Graceful Degradation**: Automatically falls back to git-only mode when server unavailable
|
||||
- **Zero Configuration**: Works without Agent Mail (optional enhancement)
|
||||
|
||||
## Installation
|
||||
|
||||
No installation required - just copy `beads_mail_adapter.py` to your project:
|
||||
|
||||
```bash
|
||||
cp lib/beads_mail_adapter.py /path/to/your/agent/
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
|
||||
```python
|
||||
from beads_mail_adapter import AgentMailAdapter
|
||||
|
||||
# Initialize adapter (automatically detects server availability)
|
||||
adapter = AgentMailAdapter()
|
||||
|
||||
if adapter.enabled:
|
||||
print("✅ Agent Mail coordination enabled")
|
||||
else:
|
||||
print("⚠️ Agent Mail unavailable, using git-only mode")
|
||||
|
||||
# Reserve issue before claiming
|
||||
if adapter.reserve_issue("bd-123"):
|
||||
# Claim issue in Beads
|
||||
subprocess.run(["bd", "update", "bd-123", "--status", "in_progress"])
|
||||
|
||||
# Do work...
|
||||
|
||||
# Notify other agents
|
||||
adapter.notify("status_changed", {"issue_id": "bd-123", "status": "completed"})
|
||||
|
||||
# Release reservation
|
||||
adapter.release_issue("bd-123")
|
||||
else:
|
||||
print("❌ Issue bd-123 already reserved by another agent")
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
Configure via environment variables:
|
||||
|
||||
```bash
|
||||
# Agent Mail server URL (default: http://127.0.0.1:8765)
|
||||
export AGENT_MAIL_URL=http://localhost:8765
|
||||
|
||||
# Authentication token (optional)
|
||||
export AGENT_MAIL_TOKEN=your-bearer-token
|
||||
|
||||
# Agent identifier (default: hostname)
|
||||
export BEADS_AGENT_NAME=assistant-alpha
|
||||
|
||||
# Request timeout in seconds (default: 5)
|
||||
export AGENT_MAIL_TIMEOUT=5
|
||||
```
|
||||
|
||||
Or pass directly to constructor:
|
||||
|
||||
```python
|
||||
adapter = AgentMailAdapter(
|
||||
url="http://localhost:8765",
|
||||
token="your-token",
|
||||
agent_name="assistant-alpha",
|
||||
timeout=5
|
||||
)
|
||||
```
|
||||
|
||||
## API Reference
|
||||
|
||||
### `AgentMailAdapter(url=None, token=None, agent_name=None, timeout=5)`
|
||||
|
||||
Initialize adapter with optional configuration overrides.
|
||||
|
||||
**Attributes:**
|
||||
- `enabled` (bool): True if server is available, False otherwise
|
||||
|
||||
### `reserve_issue(issue_id: str, ttl: int = 3600) -> bool`
|
||||
|
||||
Reserve an issue to prevent other agents from claiming it.
|
||||
|
||||
**Args:**
|
||||
- `issue_id`: Issue ID (e.g., "bd-123")
|
||||
- `ttl`: Reservation time-to-live in seconds (default: 1 hour)
|
||||
|
||||
**Returns:** True if reservation successful, False if already reserved
|
||||
|
||||
### `release_issue(issue_id: str) -> bool`
|
||||
|
||||
Release a previously reserved issue.
|
||||
|
||||
**Returns:** True on success
|
||||
|
||||
### `notify(event_type: str, data: Dict[str, Any]) -> bool`
|
||||
|
||||
Send notification to other agents.
|
||||
|
||||
**Args:**
|
||||
- `event_type`: Event type (e.g., "status_changed", "issue_completed")
|
||||
- `data`: Event payload
|
||||
|
||||
**Returns:** True on success
|
||||
|
||||
### `check_inbox() -> List[Dict[str, Any]]`
|
||||
|
||||
Check for incoming notifications from other agents.
|
||||
|
||||
**Returns:** List of notification messages (empty if none or server unavailable)
|
||||
|
||||
### `get_reservations() -> List[Dict[str, Any]]`
|
||||
|
||||
Get all active reservations.
|
||||
|
||||
**Returns:** List of active reservations
|
||||
|
||||
## Testing
|
||||
|
||||
Run the test suite:
|
||||
|
||||
```bash
|
||||
cd lib
|
||||
python3 test_beads_mail_adapter.py -v
|
||||
```
|
||||
|
||||
Coverage includes:
|
||||
- Server available/unavailable scenarios
|
||||
- Graceful degradation
|
||||
- Reservation conflicts
|
||||
- Environment variable configuration
|
||||
|
||||
## Integration Examples
|
||||
|
||||
See [examples/python-agent/agent.py](../examples/python-agent/agent.py) for a complete agent implementation.
|
||||
|
||||
## Graceful Degradation
|
||||
|
||||
The adapter is designed to **never block or fail** your agent:
|
||||
|
||||
- If server is unavailable on init → `enabled = False`, all operations no-op
|
||||
- If server dies mid-operation → methods return success (graceful degradation)
|
||||
- If network timeout → operations continue (no blocking)
|
||||
- If 409 conflict on reservation → returns `False` (expected behavior)
|
||||
|
||||
This ensures your agent works identically with or without Agent Mail.
|
||||
|
||||
## When to Use Agent Mail
|
||||
|
||||
**Use Agent Mail when:**
|
||||
- Running multiple AI agents concurrently
|
||||
- Need real-time collision prevention
|
||||
- Want to reduce git commit noise
|
||||
- Need <100ms coordination latency
|
||||
|
||||
**Stick with git-only when:**
|
||||
- Single agent workflow
|
||||
- No concurrent work
|
||||
- Simplicity over speed
|
||||
- No server infrastructure available
|
||||
|
||||
## Resources
|
||||
|
||||
- [ADR 002: Agent Mail Integration](../docs/adr/002-agent-mail-integration.md)
|
||||
- [MCP Agent Mail Repository](https://github.com/Dicklesworthstone/mcp_agent_mail)
|
||||
- [Latency Benchmark Results](../latency_results.md)
|
||||
BIN
lib/__pycache__/beads_mail_adapter.cpython-314.pyc
Normal file
BIN
lib/__pycache__/beads_mail_adapter.cpython-314.pyc
Normal file
Binary file not shown.
267
lib/beads_mail_adapter.py
Normal file
267
lib/beads_mail_adapter.py
Normal file
@@ -0,0 +1,267 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Beads Agent Mail Adapter
|
||||
|
||||
Lightweight HTTP client for MCP Agent Mail server that provides:
|
||||
- File reservation system (collision prevention)
|
||||
- Real-time notifications between agents
|
||||
- Status update coordination
|
||||
- Graceful degradation when server unavailable
|
||||
|
||||
Usage:
|
||||
from beads_mail_adapter import AgentMailAdapter
|
||||
|
||||
adapter = AgentMailAdapter()
|
||||
if adapter.enabled:
|
||||
adapter.reserve_issue("bd-123")
|
||||
adapter.notify("status_changed", {"issue_id": "bd-123", "status": "in_progress"})
|
||||
adapter.release_issue("bd-123")
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from typing import Optional, Dict, Any, List
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import URLError, HTTPError
|
||||
import json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AgentMailAdapter:
|
||||
"""
|
||||
Agent Mail HTTP client with health checks and graceful degradation.
|
||||
|
||||
Environment variables:
|
||||
AGENT_MAIL_URL: Server URL (default: http://127.0.0.1:8765)
|
||||
AGENT_MAIL_TOKEN: Bearer token for authentication
|
||||
BEADS_AGENT_NAME: Agent identifier (default: hostname)
|
||||
AGENT_MAIL_TIMEOUT: Request timeout in seconds (default: 5)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: Optional[str] = None,
|
||||
token: Optional[str] = None,
|
||||
agent_name: Optional[str] = None,
|
||||
timeout: int = 5
|
||||
):
|
||||
"""
|
||||
Initialize Agent Mail adapter with health check.
|
||||
|
||||
Args:
|
||||
url: Server URL (overrides AGENT_MAIL_URL env var)
|
||||
token: Bearer token (overrides AGENT_MAIL_TOKEN env var)
|
||||
agent_name: Agent identifier (overrides BEADS_AGENT_NAME env var)
|
||||
timeout: HTTP request timeout in seconds
|
||||
"""
|
||||
self.url = url or os.getenv("AGENT_MAIL_URL", "http://127.0.0.1:8765")
|
||||
self.token = token or os.getenv("AGENT_MAIL_TOKEN", "")
|
||||
self.agent_name = agent_name or os.getenv("BEADS_AGENT_NAME") or self._get_default_agent_name()
|
||||
self.timeout = int(os.getenv("AGENT_MAIL_TIMEOUT", str(timeout)))
|
||||
self.enabled = False
|
||||
|
||||
# Remove trailing slash from URL
|
||||
self.url = self.url.rstrip("/")
|
||||
|
||||
# Perform health check on initialization
|
||||
self._health_check()
|
||||
|
||||
def _get_default_agent_name(self) -> str:
|
||||
"""Get default agent name from hostname or fallback."""
|
||||
import socket
|
||||
try:
|
||||
return socket.gethostname()
|
||||
except Exception:
|
||||
return "beads-agent"
|
||||
|
||||
def _health_check(self) -> None:
|
||||
"""
|
||||
Check if Agent Mail server is reachable.
|
||||
Sets self.enabled based on health check result.
|
||||
"""
|
||||
try:
|
||||
response = self._request("GET", "/api/health", timeout=2)
|
||||
if response and response.get("status") == "ok":
|
||||
self.enabled = True
|
||||
logger.info(f"Agent Mail server available at {self.url}")
|
||||
else:
|
||||
logger.warning(f"Agent Mail server health check failed, falling back to Beads-only mode")
|
||||
self.enabled = False
|
||||
except Exception as e:
|
||||
logger.info(f"Agent Mail server unavailable ({e}), falling back to Beads-only mode")
|
||||
self.enabled = False
|
||||
|
||||
def _request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
data: Optional[Dict[str, Any]] = None,
|
||||
timeout: Optional[int] = None
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Make HTTP request to Agent Mail server.
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, DELETE)
|
||||
path: API path (must start with /)
|
||||
data: Request body (JSON)
|
||||
timeout: Request timeout override
|
||||
|
||||
Returns:
|
||||
Response JSON or None on error
|
||||
"""
|
||||
if not self.enabled and not path.endswith("/health"):
|
||||
return None
|
||||
|
||||
url = f"{self.url}{path}"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
if self.token:
|
||||
headers["Authorization"] = f"Bearer {self.token}"
|
||||
|
||||
body = json.dumps(data).encode("utf-8") if data else None
|
||||
|
||||
try:
|
||||
req = Request(url, data=body, headers=headers, method=method)
|
||||
with urlopen(req, timeout=timeout or self.timeout) as response:
|
||||
if response.status in (200, 201, 204):
|
||||
response_data = response.read()
|
||||
if response_data:
|
||||
return json.loads(response_data)
|
||||
return {}
|
||||
else:
|
||||
logger.warning(f"Agent Mail request failed: {method} {path} -> {response.status}")
|
||||
return None
|
||||
except HTTPError as e:
|
||||
if e.code == 409: # Conflict (reservation already exists)
|
||||
error_body = e.read().decode("utf-8")
|
||||
try:
|
||||
error_data = json.loads(error_body)
|
||||
logger.warning(f"Agent Mail conflict: {error_data.get('error', 'Unknown error')}")
|
||||
return {"error": error_data.get("error"), "status_code": 409}
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(f"Agent Mail conflict: {error_body}")
|
||||
return {"error": error_body, "status_code": 409}
|
||||
else:
|
||||
logger.warning(f"Agent Mail HTTP error: {method} {path} -> {e.code} {e.reason}")
|
||||
return None
|
||||
except URLError as e:
|
||||
logger.debug(f"Agent Mail connection error: {e.reason}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.debug(f"Agent Mail request error: {e}")
|
||||
return None
|
||||
|
||||
def reserve_issue(self, issue_id: str, ttl: int = 3600) -> bool:
|
||||
"""
|
||||
Reserve an issue to prevent other agents from claiming it.
|
||||
|
||||
Args:
|
||||
issue_id: Issue ID (e.g., "bd-123")
|
||||
ttl: Reservation time-to-live in seconds (default: 1 hour)
|
||||
|
||||
Returns:
|
||||
True if reservation successful, False otherwise
|
||||
"""
|
||||
if not self.enabled:
|
||||
return True # No-op in Beads-only mode
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/reservations",
|
||||
data={
|
||||
"file_path": f".beads/issues/{issue_id}",
|
||||
"agent_name": self.agent_name,
|
||||
"ttl": ttl
|
||||
}
|
||||
)
|
||||
|
||||
if response and response.get("status_code") == 409:
|
||||
logger.error(f"Issue {issue_id} already reserved: {response.get('error')}")
|
||||
return False
|
||||
|
||||
# Graceful degradation: return True if request failed (None)
|
||||
return True
|
||||
|
||||
def release_issue(self, issue_id: str) -> bool:
|
||||
"""
|
||||
Release a previously reserved issue.
|
||||
|
||||
Args:
|
||||
issue_id: Issue ID to release
|
||||
|
||||
Returns:
|
||||
True if release successful, False otherwise
|
||||
"""
|
||||
if not self.enabled:
|
||||
return True
|
||||
|
||||
response = self._request(
|
||||
"DELETE",
|
||||
f"/api/reservations/{self.agent_name}/{issue_id}"
|
||||
)
|
||||
# Graceful degradation: return True even if request failed
|
||||
return True
|
||||
|
||||
def notify(self, event_type: str, data: Dict[str, Any]) -> bool:
|
||||
"""
|
||||
Send notification to other agents.
|
||||
|
||||
Args:
|
||||
event_type: Event type (e.g., "status_changed", "issue_completed")
|
||||
data: Event payload
|
||||
|
||||
Returns:
|
||||
True if notification sent, False otherwise
|
||||
"""
|
||||
if not self.enabled:
|
||||
return True
|
||||
|
||||
response = self._request(
|
||||
"POST",
|
||||
"/api/notifications",
|
||||
data={
|
||||
"from_agent": self.agent_name,
|
||||
"event_type": event_type,
|
||||
"payload": data
|
||||
}
|
||||
)
|
||||
# Graceful degradation: return True even if request failed
|
||||
return True
|
||||
|
||||
def check_inbox(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Check for incoming notifications from other agents.
|
||||
|
||||
Returns:
|
||||
List of notification messages (empty if server unavailable)
|
||||
"""
|
||||
if not self.enabled:
|
||||
return []
|
||||
|
||||
response = self._request("GET", f"/api/notifications/{self.agent_name}")
|
||||
if response and isinstance(response, list):
|
||||
return response
|
||||
elif response and "messages" in response:
|
||||
return response["messages"]
|
||||
# Graceful degradation: return empty list if request failed
|
||||
return []
|
||||
|
||||
def get_reservations(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all active reservations.
|
||||
|
||||
Returns:
|
||||
List of active reservations
|
||||
"""
|
||||
if not self.enabled:
|
||||
return []
|
||||
|
||||
response = self._request("GET", "/api/reservations")
|
||||
if response and isinstance(response, list):
|
||||
return response
|
||||
elif response and "reservations" in response:
|
||||
return response["reservations"]
|
||||
# Graceful degradation: return empty list if request failed
|
||||
return []
|
||||
285
lib/test_beads_mail_adapter.py
Normal file
285
lib/test_beads_mail_adapter.py
Normal file
@@ -0,0 +1,285 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unit tests for beads_mail_adapter.py
|
||||
|
||||
Tests cover:
|
||||
- Enabled mode (server available)
|
||||
- Disabled mode (server unavailable)
|
||||
- Graceful degradation (server dies mid-operation)
|
||||
- Reservation conflicts
|
||||
- Message sending/receiving
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import patch, Mock, MagicMock
|
||||
from urllib.error import URLError, HTTPError
|
||||
from io import BytesIO
|
||||
|
||||
from beads_mail_adapter import AgentMailAdapter
|
||||
|
||||
|
||||
class TestAgentMailAdapterDisabled(unittest.TestCase):
|
||||
"""Test adapter when server is unavailable (disabled mode)."""
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_init_server_unavailable(self, mock_urlopen):
|
||||
"""Test initialization when server is unreachable."""
|
||||
mock_urlopen.side_effect = URLError("Connection refused")
|
||||
|
||||
adapter = AgentMailAdapter(
|
||||
url="http://localhost:9999",
|
||||
token="test-token",
|
||||
agent_name="test-agent"
|
||||
)
|
||||
|
||||
self.assertFalse(adapter.enabled)
|
||||
self.assertEqual(adapter.url, "http://localhost:9999")
|
||||
self.assertEqual(adapter.agent_name, "test-agent")
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_operations_no_op_when_disabled(self, mock_urlopen):
|
||||
"""Test that all operations gracefully no-op when disabled."""
|
||||
mock_urlopen.side_effect = URLError("Connection refused")
|
||||
|
||||
adapter = AgentMailAdapter()
|
||||
self.assertFalse(adapter.enabled)
|
||||
|
||||
# All operations should succeed without making requests
|
||||
self.assertTrue(adapter.reserve_issue("bd-123"))
|
||||
self.assertTrue(adapter.release_issue("bd-123"))
|
||||
self.assertTrue(adapter.notify("test", {"foo": "bar"}))
|
||||
self.assertEqual(adapter.check_inbox(), [])
|
||||
self.assertEqual(adapter.get_reservations(), [])
|
||||
|
||||
|
||||
class TestAgentMailAdapterEnabled(unittest.TestCase):
|
||||
"""Test adapter when server is available (enabled mode)."""
|
||||
|
||||
def _mock_response(self, status_code=200, data=None):
|
||||
"""Create mock HTTP response."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status = status_code
|
||||
mock_response.__enter__ = Mock(return_value=mock_response)
|
||||
mock_response.__exit__ = Mock(return_value=False)
|
||||
|
||||
if data is not None:
|
||||
mock_response.read.return_value = json.dumps(data).encode('utf-8')
|
||||
else:
|
||||
mock_response.read.return_value = b''
|
||||
|
||||
return mock_response
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_init_server_available(self, mock_urlopen):
|
||||
"""Test initialization when server is healthy."""
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
|
||||
adapter = AgentMailAdapter(
|
||||
url="http://localhost:8765",
|
||||
token="test-token",
|
||||
agent_name="test-agent"
|
||||
)
|
||||
|
||||
self.assertTrue(adapter.enabled)
|
||||
self.assertEqual(adapter.url, "http://localhost:8765")
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_reserve_issue_success(self, mock_urlopen):
|
||||
"""Test successful issue reservation."""
|
||||
# Health check
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(agent_name="test-agent")
|
||||
|
||||
# Reservation request
|
||||
mock_urlopen.return_value = self._mock_response(201, {"reserved": True})
|
||||
result = adapter.reserve_issue("bd-123")
|
||||
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(adapter.enabled)
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_reserve_issue_conflict(self, mock_urlopen):
|
||||
"""Test reservation conflict (issue already reserved)."""
|
||||
# Health check
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(agent_name="test-agent")
|
||||
|
||||
# Simulate 409 Conflict
|
||||
error_response = HTTPError(
|
||||
url="http://test",
|
||||
code=409,
|
||||
msg="Conflict",
|
||||
hdrs={},
|
||||
fp=BytesIO(json.dumps({"error": "Already reserved by other-agent"}).encode('utf-8'))
|
||||
)
|
||||
mock_urlopen.side_effect = error_response
|
||||
|
||||
result = adapter.reserve_issue("bd-123")
|
||||
|
||||
self.assertFalse(result)
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_release_issue_success(self, mock_urlopen):
|
||||
"""Test successful issue release."""
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(agent_name="test-agent")
|
||||
|
||||
mock_urlopen.return_value = self._mock_response(204)
|
||||
result = adapter.release_issue("bd-123")
|
||||
|
||||
self.assertTrue(result)
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_notify_success(self, mock_urlopen):
|
||||
"""Test sending notification."""
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(agent_name="test-agent")
|
||||
|
||||
mock_urlopen.return_value = self._mock_response(201, {"sent": True})
|
||||
result = adapter.notify("status_changed", {"issue_id": "bd-123", "status": "in_progress"})
|
||||
|
||||
self.assertTrue(result)
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_check_inbox_with_messages(self, mock_urlopen):
|
||||
"""Test checking inbox with messages."""
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(agent_name="test-agent")
|
||||
|
||||
messages = [
|
||||
{"from": "agent-1", "event": "completed", "data": {"issue_id": "bd-42"}},
|
||||
{"from": "agent-2", "event": "started", "data": {"issue_id": "bd-99"}}
|
||||
]
|
||||
mock_urlopen.return_value = self._mock_response(200, messages)
|
||||
|
||||
result = adapter.check_inbox()
|
||||
|
||||
self.assertEqual(len(result), 2)
|
||||
self.assertEqual(result[0]["from"], "agent-1")
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_check_inbox_empty(self, mock_urlopen):
|
||||
"""Test checking empty inbox."""
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(agent_name="test-agent")
|
||||
|
||||
mock_urlopen.return_value = self._mock_response(200, [])
|
||||
result = adapter.check_inbox()
|
||||
|
||||
self.assertEqual(result, [])
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_get_reservations(self, mock_urlopen):
|
||||
"""Test getting all reservations."""
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(agent_name="test-agent")
|
||||
|
||||
reservations = [
|
||||
{"issue_id": "bd-123", "agent": "agent-1"},
|
||||
{"issue_id": "bd-456", "agent": "agent-2"}
|
||||
]
|
||||
mock_urlopen.return_value = self._mock_response(200, reservations)
|
||||
|
||||
result = adapter.get_reservations()
|
||||
|
||||
self.assertEqual(len(result), 2)
|
||||
|
||||
|
||||
class TestGracefulDegradation(unittest.TestCase):
|
||||
"""Test graceful degradation when server fails mid-operation."""
|
||||
|
||||
def _mock_response(self, status_code=200, data=None):
|
||||
"""Create mock HTTP response."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status = status_code
|
||||
mock_response.__enter__ = Mock(return_value=mock_response)
|
||||
mock_response.__exit__ = Mock(return_value=False)
|
||||
|
||||
if data is not None:
|
||||
mock_response.read.return_value = json.dumps(data).encode('utf-8')
|
||||
else:
|
||||
mock_response.read.return_value = b''
|
||||
|
||||
return mock_response
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_server_dies_mid_operation(self, mock_urlopen):
|
||||
"""Test that operations gracefully handle server failures."""
|
||||
# Initially healthy
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter()
|
||||
self.assertTrue(adapter.enabled)
|
||||
|
||||
# Server dies during operation
|
||||
mock_urlopen.side_effect = URLError("Connection refused")
|
||||
|
||||
# Operations should still succeed (graceful degradation)
|
||||
self.assertTrue(adapter.reserve_issue("bd-123"))
|
||||
self.assertTrue(adapter.release_issue("bd-123"))
|
||||
self.assertTrue(adapter.notify("test", {}))
|
||||
self.assertEqual(adapter.check_inbox(), [])
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_network_timeout(self, mock_urlopen):
|
||||
"""Test handling of network timeouts."""
|
||||
mock_urlopen.return_value = self._mock_response(200, {"status": "ok"})
|
||||
adapter = AgentMailAdapter(timeout=1)
|
||||
|
||||
mock_urlopen.side_effect = URLError("Timeout")
|
||||
|
||||
# Should not crash
|
||||
self.assertTrue(adapter.reserve_issue("bd-123"))
|
||||
|
||||
|
||||
class TestConfiguration(unittest.TestCase):
|
||||
"""Test environment variable configuration."""
|
||||
|
||||
@patch.dict(os.environ, {
|
||||
'AGENT_MAIL_URL': 'http://custom:9000',
|
||||
'AGENT_MAIL_TOKEN': 'custom-token',
|
||||
'BEADS_AGENT_NAME': 'custom-agent',
|
||||
'AGENT_MAIL_TIMEOUT': '10'
|
||||
})
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_env_var_configuration(self, mock_urlopen):
|
||||
"""Test configuration from environment variables."""
|
||||
mock_urlopen.side_effect = URLError("Not testing connection")
|
||||
|
||||
adapter = AgentMailAdapter()
|
||||
|
||||
self.assertEqual(adapter.url, "http://custom:9000")
|
||||
self.assertEqual(adapter.token, "custom-token")
|
||||
self.assertEqual(adapter.agent_name, "custom-agent")
|
||||
self.assertEqual(adapter.timeout, 10)
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_constructor_overrides_env(self, mock_urlopen):
|
||||
"""Test that constructor args override environment variables."""
|
||||
mock_urlopen.side_effect = URLError("Not testing connection")
|
||||
|
||||
adapter = AgentMailAdapter(
|
||||
url="http://override:8765",
|
||||
token="override-token",
|
||||
agent_name="override-agent",
|
||||
timeout=3
|
||||
)
|
||||
|
||||
self.assertEqual(adapter.url, "http://override:8765")
|
||||
self.assertEqual(adapter.token, "override-token")
|
||||
self.assertEqual(adapter.agent_name, "override-agent")
|
||||
self.assertEqual(adapter.timeout, 3)
|
||||
|
||||
@patch('beads_mail_adapter.urlopen')
|
||||
def test_url_trailing_slash_removed(self, mock_urlopen):
|
||||
"""Test that trailing slashes are removed from URL."""
|
||||
mock_urlopen.side_effect = URLError("Not testing connection")
|
||||
|
||||
adapter = AgentMailAdapter(url="http://localhost:8765/")
|
||||
|
||||
self.assertEqual(adapter.url, "http://localhost:8765")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user