refactor: Remove legacy MCP Agent Mail integration (bd-6gd)

Remove the external MCP Agent Mail server integration that required
running a separate HTTP server and configuring environment variables.

The native `bd mail` system (stored as git-synced issues) remains
unchanged and is the recommended approach for inter-agent messaging.

Files removed:
- cmd/bd/message.go - Legacy `bd message` command
- integrations/beads-mcp/src/beads_mcp/mail.py, mail_tools.py
- lib/beads_mail_adapter.py - Python adapter library
- examples/go-agent/ - Agent Mail-focused example
- examples/python-agent/agent_with_mail.py, AGENT_MAIL_EXAMPLE.md
- docs/AGENT_MAIL*.md, docs/adr/002-agent-mail-integration.md
- tests/integration/test_agent_race.py, test_mail_failures.py, etc.
- tests/benchmarks/ - Agent Mail benchmarks

Updated documentation to remove Agent Mail references while keeping
native `bd mail` documentation intact.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-17 23:14:05 -08:00
parent 6920cd5224
commit 83ae110508
38 changed files with 267 additions and 10253 deletions

View File

@@ -1,196 +0,0 @@
# Agent Mail Integration Test Coverage
## Test Suite Summary
**Total test time**: ~55 seconds (all suites)
**Total tests**: 66 tests across 5 files
## Coverage by Category
### 1. HTTP Adapter Unit Tests (`lib/test_beads_mail_adapter.py`)
**51 tests in 0.019s**
**Enabled/Disabled Mode**
- Server available vs unavailable
- Graceful degradation when server dies mid-operation
- Operations no-op when disabled
**Reservation Operations**
- Successful reservation (201)
- Conflict handling (409)
- Custom TTL support
- Multiple reservations by same agent
- Release operations (204)
- Double release idempotency
**HTTP Error Handling**
- 500 Internal Server Error
- 404 Not Found
- 409 Conflict with malformed body
- Network timeouts
- Malformed JSON responses
- Empty response bodies (204 No Content)
**Configuration**
- Environment variable configuration
- Constructor parameter overrides
- URL normalization (trailing slash removal)
- Default agent name from hostname
- Timeout configuration
**Authorization**
- Bearer token headers
- Missing token behavior
- Content-Type headers
**Request Validation**
- Body structure for reservations
- Body structure for notifications
- URL structure for releases
- URL structure for inbox checks
**Inbox & Notifications**
- Send notifications
- Check inbox with messages
- Empty inbox handling
- Dict wrapper responses
- Large message lists (100 messages)
- Nested payload data
- Empty and large payloads
- Unicode handling
### 2. Multi-Agent Race Conditions (`tests/integration/test_agent_race.py`)
**3 tests in ~15s**
**Collision Prevention**
- 3 agents competing for 1 issue (WITH Agent Mail)
- Only one winner with reservations
- Multiple agents without Agent Mail (collision demo)
**Stress Testing**
- 10 agents competing for 1 issue
- Exactly one winner guaranteed
- JSONL consistency verification
### 3. Server Failure Scenarios (`tests/integration/test_mail_failures.py`)
**7 tests in ~20s**
**Failure Modes**
- Server never started (connection refused)
- Server crash during operation
- Network partition (timeout)
- Server 500 errors
- Invalid bearer token (401)
- Malformed JSON responses
**Graceful Degradation**
- Agents continue working in Beads-only mode
- JSONL remains consistent across failures
- No crashes or data loss
### 4. Reservation TTL & Expiration (`tests/integration/test_reservation_ttl.py`)
**4 tests in ~60s** (includes 30s waits for expiration)
**Time-Based Behavior**
- Short TTL reservations (30s)
- Reservation blocking verification
- Auto-release after expiration
- Renewal/heartbeat mechanisms
### 5. Multi-Agent Coordination (`tests/integration/test_multi_agent_coordination.py`)
**4 tests in ~11s** ⭐ NEW
**Fairness**
- 10 agents competing for 5 issues
- Each issue claimed exactly once
- No duplicate claims in JSONL
**Notifications**
- End-to-end message delivery
- Inbox consumption (messages cleared after read)
- Message structure validation
**Handoff Scenarios**
- Agent releases, another immediately claims
- Clean reservation ownership transfer
**Idempotency**
- Double reserve by same agent (safe)
- Double release by same agent (safe)
- Reservation count verification
## Coverage Gaps (Intentionally Not Tested)
### Low-Priority Edge Cases
- **Path traversal in issue IDs**: Issue IDs are validated elsewhere in bd
- **429 Retry-After logic**: Nice-to-have, not critical for v1
- **HTTPS/TLS verification**: Out of scope for integration layer
- **Re-enable after recovery**: Complex, requires persistent health checking
- **Token rotation mid-run**: Rare scenario, not worth complexity
- **Slow tests**: 50+ agent stress tests, soak tests, inbox flood (>10k messages)
### Why Skipped
These scenarios are either:
1. **Validated elsewhere** (e.g., issue ID validation in bd core)
2. **Low probability** (e.g., token rotation during agent run)
3. **Nice-to-have features** (e.g., automatic re-enable, retry policies)
4. **Too slow for CI** (e.g., multi-hour soak tests, 50-agent races)
## Test Execution
### Run All Tests
```bash
# Unit tests (fast, 0.02s)
python3 lib/test_beads_mail_adapter.py
# Multi-agent coordination (11s)
python3 tests/integration/test_multi_agent_coordination.py
# Race conditions (15s, requires Agent Mail server or falls back)
python3 tests/integration/test_agent_race.py
# Failure scenarios (20s)
python3 tests/integration/test_mail_failures.py
# TTL/expiration (60s - includes deliberate waits)
python3 tests/integration/test_reservation_ttl.py
```
### Quick Validation (No Slow Tests)
```bash
python3 lib/test_beads_mail_adapter.py
python3 tests/integration/test_multi_agent_coordination.py
python3 tests/integration/test_mail_failures.py
# Total: ~31s
```
## Assertions Verified
**Correctness**
- Only one agent claims each issue (collision prevention)
- Notifications deliver correctly
- Reservations block other agents
- JSONL remains consistent across all failure modes
**Reliability**
- Graceful degradation when server unavailable
- Idempotent operations don't corrupt state
- Expired reservations auto-release
- Handoffs work cleanly
**Performance**
- Fast timeout detection (1-2s)
- No blocking on server failures
- Tests complete in reasonable time (<2min total)
## Future Enhancements (Optional)
If real-world usage reveals issues:
1. **Retry policies** with exponential backoff for 429/5xx
2. **Pagination** for inbox/reservations (if >1k messages)
3. **Automatic re-enable** with periodic health checks
4. **Agent instance IDs** to prevent same-name collisions
5. **Soak/stress testing** for production validation
Current test suite provides **strong confidence** for multi-agent workflows without overengineering.

View File

@@ -2,102 +2,18 @@
This directory contains integration tests for bd (beads) that test end-to-end functionality.
## Tests
### test_agent_race.py
Multi-agent race condition test that validates collision prevention with Agent Mail.
**What it tests:**
- Multiple agents simultaneously attempting to claim the same issue
- WITH Agent Mail: Only one agent succeeds (via reservation)
- WITHOUT Agent Mail: Multiple agents may succeed (collision)
- Verification via JSONL that no duplicate claims occur
### test_mail_failures.py
Agent Mail server failure scenarios test that validates graceful degradation.
**What it tests:**
- Server never started (connection refused)
- Server crashes during operation
- Network partition (timeout)
- Server returns 500 errors
- Invalid bearer token (401)
- Malformed JSON responses
- JSONL consistency under multiple failures
**Performance:**
- Uses `--no-daemon` flag for fast tests (~33s total)
- 1s HTTP timeouts for quick failure detection
- Mock HTTP server avoids real network calls
### test_reservation_ttl.py
Reservation TTL and expiration test that validates time-based reservation behavior.
**What it tests:**
- Short TTL reservations (30s)
- Reservation blocking verification (agent2 cannot claim while agent1 holds reservation)
- Auto-release after expiration (expired reservations become available)
- Renewal/heartbeat mechanism (re-reserving extends expiration)
**Performance:**
- Uses `--no-daemon` flag for fast tests
- 30s TTL for expiration tests (includes wait time)
- Total test time: ~57s (includes 30s+ waiting for expiration)
- Mock HTTP server with full TTL management
## Prerequisites
- bd installed: `go install github.com/steveyegge/beads/cmd/bd@latest`
- Agent Mail server running (optional, for full test suite):
```bash
cd ~/src/mcp_agent_mail
source .venv/bin/activate
uv run python -m mcp_agent_mail.cli serve-http
```
- Python 3.7+ for Python-based tests
## Running Tests
**Run test_agent_race.py:**
```bash
python3 tests/integration/test_agent_race.py
# Run all integration tests
python3 -m pytest tests/integration/
```
**Run test_mail_failures.py:**
```bash
python3 tests/integration/test_mail_failures.py
```
**Run test_reservation_ttl.py:**
```bash
python3 tests/integration/test_reservation_ttl.py
```
**Run all integration tests:**
```bash
python3 tests/integration/test_agent_race.py
python3 tests/integration/test_mail_failures.py
python3 tests/integration/test_reservation_ttl.py
```
## Expected Results
### test_agent_race.py
- **WITH Agent Mail running:** Test 1 passes (only 1 claim), Test 2 shows collision, Test 3 passes
- **WITHOUT Agent Mail running:** All tests demonstrate collision (expected behavior without reservation system)
### test_mail_failures.py
- All 7 tests should pass in ~30-35 seconds
- Each test validates graceful degradation to Beads-only mode
- JSONL remains consistent across all failure scenarios
### test_reservation_ttl.py
- All 4 tests should pass in ~57 seconds
- Tests verify TTL-based reservation expiration and renewal
- Includes 30s+ wait time to validate actual expiration behavior
## Adding New Tests
Integration tests should:

View File

@@ -1,414 +0,0 @@
#!/usr/bin/env python3
"""
Multi-agent race condition test for bd (beads) issue tracker.
Tests verify that when 2+ agents simultaneously try to claim the same issue:
1. WITH Agent Mail: Only one agent succeeds (via reservation), others skip gracefully
2. WITHOUT Agent Mail: Both agents may succeed (demonstrating the collision problem)
This test validates the collision prevention mechanism provided by Agent Mail.
"""
import json
import subprocess
import tempfile
import shutil
import os
import sys
import time
from pathlib import Path
from multiprocessing import Process, Queue
from typing import List, Tuple
# Add lib directory for beads_mail_adapter
lib_path = Path(__file__).parent.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
from beads_mail_adapter import AgentMailAdapter
class RaceTestAgent:
"""Minimal agent implementation for race condition testing."""
def __init__(self, agent_name: str, workspace: str, mail_enabled: bool = True):
self.agent_name = agent_name
self.workspace = workspace
self.mail_enabled = mail_enabled
# Initialize Agent Mail adapter
if mail_enabled:
self.mail = AgentMailAdapter(agent_name=agent_name)
else:
self.mail = None
def run_bd(self, *args) -> dict:
"""Run bd command in the test workspace."""
cmd = ["bd"] + list(args) + ["--json"]
result = subprocess.run(
cmd,
cwd=self.workspace,
capture_output=True,
text=True
)
if result.returncode != 0:
return {"error": result.stderr}
if result.stdout.strip():
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {"error": "Invalid JSON", "output": result.stdout}
return {}
def try_claim_issue(self, issue_id: str) -> Tuple[bool, str]:
"""
Attempt to claim an issue.
Returns:
(success: bool, message: str)
"""
# Integration Point 2: Reserve before claiming (if Agent Mail enabled)
if self.mail and self.mail.enabled:
reserved = self.mail.reserve_issue(issue_id)
if not reserved:
return False, f"Reservation failed for {issue_id}"
# Claim the issue
result = self.run_bd("update", issue_id, "--status", "in_progress")
if "error" in result:
if self.mail and self.mail.enabled:
self.mail.release_issue(issue_id)
return False, f"Update failed: {result['error']}"
return True, f"Successfully claimed {issue_id}"
def release_issue(self, issue_id: str):
"""Release an issue after claiming."""
if self.mail and self.mail.enabled:
self.mail.release_issue(issue_id)
def agent_worker(agent_name: str, workspace: str, target_issue_id: str,
mail_enabled: bool, result_queue: Queue):
"""
Worker function for multiprocessing.
Each worker tries to claim the same issue. Result is put in queue.
"""
try:
agent = RaceTestAgent(agent_name, workspace, mail_enabled)
# Small random delay to increase likelihood of collision
time.sleep(0.01 * hash(agent_name) % 10)
success, message = agent.try_claim_issue(target_issue_id)
result_queue.put({
"agent": agent_name,
"success": success,
"message": message,
"mail_enabled": mail_enabled
})
except Exception as e:
result_queue.put({
"agent": agent_name,
"success": False,
"message": f"Exception: {str(e)}",
"mail_enabled": mail_enabled
})
def run_race_test(num_agents: int, mail_enabled: bool) -> List[dict]:
"""
Run a race test with N agents trying to claim the same issue.
Args:
num_agents: Number of agents to spawn
mail_enabled: Whether Agent Mail is enabled
Returns:
List of result dicts from each agent
"""
# Create temporary workspace
workspace = tempfile.mkdtemp(prefix="bd-race-test-")
try:
# Initialize bd in workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Create a test issue
result = subprocess.run(
["bd", "create", "Contested issue", "-p", "1", "--json"],
cwd=workspace,
capture_output=True,
text=True,
check=True
)
issue_data = json.loads(result.stdout)
issue_id = issue_data["id"]
# Spawn agents in parallel
result_queue = Queue()
processes = []
for i in range(num_agents):
agent_name = f"agent-{i+1}"
p = Process(
target=agent_worker,
args=(agent_name, workspace, issue_id, mail_enabled, result_queue)
)
processes.append(p)
# Start all processes simultaneously
start_time = time.time()
for p in processes:
p.start()
# Wait for completion
for p in processes:
p.join(timeout=10)
elapsed = time.time() - start_time
# Collect results
results = []
while not result_queue.empty():
results.append(result_queue.get())
# Verify JSONL for duplicate claims
jsonl_path = Path(workspace) / ".beads" / "issues.jsonl"
jsonl_claims = verify_jsonl_claims(jsonl_path, issue_id)
return {
"issue_id": issue_id,
"agents": results,
"elapsed_seconds": elapsed,
"jsonl_status_changes": jsonl_claims,
"mail_enabled": mail_enabled
}
finally:
# Cleanup
shutil.rmtree(workspace, ignore_errors=True)
def verify_jsonl_claims(jsonl_path: Path, issue_id: str) -> List[dict]:
"""
Parse JSONL and count how many times the issue status was changed to in_progress.
Returns list of status change events.
"""
if not jsonl_path.exists():
return []
status_changes = []
with open(jsonl_path) as f:
for line in f:
if not line.strip():
continue
try:
record = json.loads(line)
if record.get("id") == issue_id and record.get("status") == "in_progress":
status_changes.append({
"updated_at": record.get("updated_at"),
"assignee": record.get("assignee")
})
except json.JSONDecodeError:
continue
return status_changes
def test_agent_race_with_mail():
"""Test that WITH Agent Mail, only one agent succeeds."""
print("\n" + "="*70)
print("TEST 1: Race condition WITH Agent Mail (collision prevention)")
print("="*70)
num_agents = 3
result = run_race_test(num_agents, mail_enabled=True)
# Analyze results
successful_agents = [a for a in result["agents"] if a["success"]]
failed_agents = [a for a in result["agents"] if not a["success"]]
print(f"\n📊 Results ({result['elapsed_seconds']:.3f}s):")
print(f" • Total agents: {num_agents}")
print(f" • Successful claims: {len(successful_agents)}")
print(f" • Failed claims: {len(failed_agents)}")
print(f" • JSONL status changes: {len(result['jsonl_status_changes'])}")
for agent in result["agents"]:
status = "" if agent["success"] else ""
print(f" {status} {agent['agent']}: {agent['message']}")
# Verify: Only one agent should succeed
assert len(successful_agents) == 1, \
f"Expected 1 successful claim, got {len(successful_agents)}"
# Verify: JSONL should have exactly 1 in_progress status
assert len(result['jsonl_status_changes']) == 1, \
f"Expected 1 JSONL status change, got {len(result['jsonl_status_changes'])}"
print("\n✅ PASS: Agent Mail prevented duplicate claims")
return True
def test_agent_race_without_mail():
"""Test that WITHOUT Agent Mail, multiple agents may succeed (collision)."""
print("\n" + "="*70)
print("TEST 2: Race condition WITHOUT Agent Mail (collision demonstration)")
print("="*70)
print("⚠️ Note: This test may occasionally pass if timing prevents collision")
num_agents = 3
result = run_race_test(num_agents, mail_enabled=False)
# Analyze results
successful_agents = [a for a in result["agents"] if a["success"]]
failed_agents = [a for a in result["agents"] if not a["success"]]
print(f"\n📊 Results ({result['elapsed_seconds']:.3f}s):")
print(f" • Total agents: {num_agents}")
print(f" • Successful claims: {len(successful_agents)}")
print(f" • Failed claims: {len(failed_agents)}")
print(f" • JSONL status changes: {len(result['jsonl_status_changes'])}")
for agent in result["agents"]:
status = "" if agent["success"] else ""
print(f" {status} {agent['agent']}: {agent['message']}")
# Without Agent Mail, we expect potential for duplicates
# (though timing may occasionally prevent it)
if len(successful_agents) > 1:
print(f"\n⚠️ EXPECTED: Multiple agents ({len(successful_agents)}) claimed same issue")
print(" This demonstrates the collision problem Agent Mail prevents")
else:
print("\n⚠️ NOTE: Only one agent succeeded (timing prevented collision this run)")
print(" Without Agent Mail, collisions are possible but not guaranteed")
return True
def test_agent_race_stress_test():
"""Stress test with many agents."""
print("\n" + "="*70)
print("TEST 3: Stress test with 10 agents (Agent Mail enabled)")
print("="*70)
num_agents = 10
result = run_race_test(num_agents, mail_enabled=True)
successful_agents = [a for a in result["agents"] if a["success"]]
print(f"\n📊 Results ({result['elapsed_seconds']:.3f}s):")
print(f" • Total agents: {num_agents}")
print(f" • Successful claims: {len(successful_agents)}")
print(f" • JSONL status changes: {len(result['jsonl_status_changes'])}")
# Verify: Exactly one winner
assert len(successful_agents) == 1, \
f"Expected 1 successful claim, got {len(successful_agents)}"
assert len(result['jsonl_status_changes']) == 1, \
f"Expected 1 JSONL status change, got {len(result['jsonl_status_changes'])}"
print(f"\n✅ PASS: Only {successful_agents[0]['agent']} succeeded")
return True
def check_agent_mail_server() -> bool:
"""Check if Agent Mail server is running."""
try:
import urllib.request
req = urllib.request.Request("http://localhost:8765/api/health")
with urllib.request.urlopen(req, timeout=1) as response:
return response.status == 200
except:
return False
def main():
"""Run all race condition tests."""
print("🧪 Multi-Agent Race Condition Test Suite")
print("Testing collision prevention with Agent Mail")
try:
# Check if bd is available
subprocess.run(["bd", "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("❌ ERROR: bd command not found")
print(" Install: go install github.com/steveyegge/beads/cmd/bd@latest")
sys.exit(1)
# Check if Agent Mail server is running
agent_mail_running = check_agent_mail_server()
if not agent_mail_running:
print("\n⚠️ WARNING: Agent Mail server is not running")
print(" Tests will fall back to beads-only mode (demonstrating collision)")
print("\n To enable full collision prevention testing:")
print(" $ cd ~/src/mcp_agent_mail")
print(" $ source .venv/bin/activate")
print(" $ uv run python -m mcp_agent_mail.cli serve-http")
print()
# Check if running in non-interactive mode (CI/automation)
if not sys.stdin.isatty():
print(" Running in non-interactive mode, continuing with tests...")
else:
print(" Press Enter to continue or Ctrl+C to exit")
try:
input()
except KeyboardInterrupt:
print("\n\n👋 Exiting - start Agent Mail server and try again")
sys.exit(0)
else:
print("\n✅ Agent Mail server is running on http://localhost:8765")
# Run tests
tests = [
("Agent Mail enabled (collision prevention)", test_agent_race_with_mail),
("Agent Mail disabled (collision demonstration)", test_agent_race_without_mail),
("Stress test (10 agents)", test_agent_race_stress_test),
]
passed = 0
failed = 0
for name, test_func in tests:
try:
if test_func():
passed += 1
except AssertionError as e:
print(f"\n❌ FAIL: {name}")
print(f" {e}")
failed += 1
except Exception as e:
print(f"\n💥 ERROR in {name}: {e}")
failed += 1
# Summary
print("\n" + "="*70)
print("SUMMARY")
print("="*70)
print(f"✅ Passed: {passed}/{len(tests)}")
print(f"❌ Failed: {failed}/{len(tests)}")
if failed == 0:
print("\n🎉 All tests passed!")
sys.exit(0)
else:
print(f"\n⚠️ {failed} test(s) failed")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,797 +0,0 @@
#!/usr/bin/env python3
"""
Agent Mail Server Failure Scenarios Test Suite
Tests verify graceful degradation across various failure modes:
- Server never started (connection refused)
- Server crashes during operation (connection reset)
- Network partition (timeout)
- Server returns 500 errors
- Invalid bearer token (401/403)
- Malformed responses
Validates:
- Agents continue working in Beads-only mode
- Clear log messages about degradation
- No crashes or data loss
- JSONL remains consistent
Performance notes:
- Uses 1s HTTP timeouts for fast failure detection
- Uses --no-daemon flag to avoid 5s debounce delays
- Mock HTTP server with minimal overhead
- Each test ~2-5s (much faster without daemon)
- Full suite ~15-30s (7 tests with workspace setup)
"""
import json
import subprocess
import tempfile
import shutil
import os
import sys
import time
import logging
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
from typing import Optional, Dict, Any, List
import socket
# Add lib directory for beads_mail_adapter
lib_path = Path(__file__).parent.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
from beads_mail_adapter import AgentMailAdapter
# Configure logging (WARNING to reduce noise)
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Fast timeout for tests (1s instead of default 5s)
TEST_TIMEOUT = 1
class MockAgentMailServer:
"""Mock Agent Mail server for testing various failure scenarios."""
def __init__(self, port: int = 0, failure_mode: Optional[str] = None):
"""
Initialize mock server.
Args:
port: Port to listen on (0 = auto-assign)
failure_mode: Type of failure to simulate:
- None: Normal operation
- "500_error": Always return 500
- "timeout": Hang requests indefinitely
- "invalid_json": Return malformed JSON
- "crash_after_health": Crash after first health check
"""
self.port = port
self.failure_mode = failure_mode
self.server: Optional[HTTPServer] = None
self.thread: Optional[Thread] = None
self.request_count = 0
self.crash_triggered = False
def start(self) -> int:
"""Start the mock server. Returns actual port number."""
handler_class = self._create_handler()
# Find available port if port=0
if self.port == 0:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.listen(1)
self.port = s.getsockname()[1]
self.server = HTTPServer(('127.0.0.1', self.port), handler_class)
self.thread = Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
# Wait for server to be ready
time.sleep(0.1)
logger.info(f"Mock Agent Mail server started on port {self.port} (mode={self.failure_mode})")
return self.port
def stop(self):
"""Stop the mock server."""
if self.server:
self.server.shutdown()
self.server.server_close()
logger.info(f"Mock Agent Mail server stopped (handled {self.request_count} requests)")
def crash(self):
"""Simulate server crash."""
self.crash_triggered = True
self.stop()
logger.info("Mock Agent Mail server CRASHED")
def _create_handler(self):
"""Create request handler class with access to server state."""
parent = self
class MockHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""Suppress default logging."""
pass
def do_GET(self):
parent.request_count += 1
# Handle crash_after_health mode
if parent.failure_mode == "crash_after_health" and parent.request_count > 1:
parent.crash()
return
# Handle timeout mode (hang long enough to trigger timeout)
if parent.failure_mode == "timeout":
time.sleep(10) # Hang longer than test timeout
return
# Handle 500 error mode
if parent.failure_mode == "500_error":
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"error": "Internal server error"}).encode())
return
# Normal health check response
if self.path == "/api/health":
response = {"status": "ok"}
if parent.failure_mode == "invalid_json":
# Return malformed JSON
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{invalid json')
return
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
parent.request_count += 1
# Read request body
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
body = self.rfile.read(content_length)
# Check authorization for invalid_token mode
if parent.failure_mode == "invalid_token":
auth = self.headers.get('Authorization', '')
if not auth or auth != "Bearer valid_token":
self.send_response(401)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"error": "Invalid token"}).encode())
return
# Handle timeout mode (hang long enough to trigger timeout)
if parent.failure_mode == "timeout":
time.sleep(10) # Hang longer than test timeout
return
# Handle 500 error mode
if parent.failure_mode == "500_error":
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"error": "Internal server error"}).encode())
return
# Normal responses for reservations/notifications
if self.path == "/api/reservations":
self.send_response(201)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "reserved"}).encode())
elif self.path == "/api/notifications":
self.send_response(201)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "sent"}).encode())
else:
self.send_response(404)
self.end_headers()
def do_DELETE(self):
parent.request_count += 1
# Handle timeout mode (hang long enough to trigger timeout)
if parent.failure_mode == "timeout":
time.sleep(10) # Hang longer than test timeout
return
# Normal release response
self.send_response(204)
self.end_headers()
return MockHandler
class TestAgent:
"""Test agent that performs basic bd operations."""
def __init__(self, workspace: str, agent_name: str = "test-agent",
mail_url: Optional[str] = None, mail_token: Optional[str] = None):
self.workspace = workspace
self.agent_name = agent_name
self.mail_url = mail_url
self.mail_token = mail_token
# Initialize adapter if URL provided
if mail_url:
self.mail = AgentMailAdapter(
url=mail_url,
token=mail_token,
agent_name=agent_name,
timeout=TEST_TIMEOUT # Use global test timeout
)
else:
self.mail = None
def run_bd(self, *args) -> dict:
"""Run bd command and return JSON output."""
# Use --no-daemon for fast tests (avoid 5s debounce timer)
cmd = ["bd", "--no-daemon"] + list(args) + ["--json"]
result = subprocess.run(
cmd,
cwd=self.workspace,
capture_output=True,
text=True
)
if result.returncode != 0:
return {"error": result.stderr}
if result.stdout.strip():
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {"error": "Invalid JSON", "output": result.stdout}
return {}
def create_issue(self, title: str, priority: int = 1) -> Optional[str]:
"""Create an issue and return its ID."""
result = self.run_bd("create", title, "-p", str(priority))
if "error" in result:
logger.error(f"Failed to create issue: {result['error']}")
return None
return result.get("id")
def claim_issue(self, issue_id: str) -> bool:
"""Attempt to claim an issue (with optional reservation)."""
# Try to reserve if Agent Mail is enabled
if self.mail and self.mail.enabled:
reserved = self.mail.reserve_issue(issue_id)
if not reserved:
logger.warning(f"Failed to reserve {issue_id}")
return False
# Update status
result = self.run_bd("update", issue_id, "--status", "in_progress")
if "error" in result:
logger.error(f"Failed to claim {issue_id}: {result['error']}")
if self.mail and self.mail.enabled:
self.mail.release_issue(issue_id)
return False
return True
def complete_issue(self, issue_id: str) -> bool:
"""Complete an issue."""
result = self.run_bd("close", issue_id, "--reason", "Done")
if "error" in result:
logger.error(f"Failed to complete {issue_id}: {result['error']}")
return False
# Release reservation if Agent Mail enabled
if self.mail and self.mail.enabled:
self.mail.release_issue(issue_id)
return True
def verify_jsonl_consistency(workspace: str) -> Dict[str, Any]:
"""
Verify JSONL file is valid and consistent.
Returns dict with:
- valid: bool
- issue_count: int
- errors: list of error messages
"""
jsonl_path = Path(workspace) / ".beads" / "issues.jsonl"
if not jsonl_path.exists():
return {"valid": False, "issue_count": 0, "errors": ["JSONL file does not exist"]}
issues = {}
errors = []
try:
with open(jsonl_path) as f:
for line_num, line in enumerate(f, 1):
if not line.strip():
continue
try:
record = json.loads(line)
issue_id = record.get("id")
if not issue_id:
errors.append(f"Line {line_num}: Missing issue ID")
continue
issues[issue_id] = record
except json.JSONDecodeError as e:
errors.append(f"Line {line_num}: Invalid JSON - {e}")
except Exception as e:
errors.append(f"Failed to read JSONL: {e}")
return {"valid": False, "issue_count": 0, "errors": errors}
return {
"valid": len(errors) == 0,
"issue_count": len(issues),
"errors": errors
}
def test_server_never_started():
"""Test that agents work when Agent Mail server is not running."""
print("\n" + "="*70)
print("TEST 1: Server Never Started (Connection Refused)")
print("="*70)
test_start = time.time()
workspace = tempfile.mkdtemp(prefix="bd-test-noserver-")
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Create agent with non-existent server
agent = TestAgent(workspace, "test-agent", mail_url="http://127.0.0.1:9999")
# Verify Agent Mail is disabled
assert agent.mail is not None, "Agent Mail adapter should exist"
assert not agent.mail.enabled, "Agent Mail should be disabled (server not running)"
# Perform normal operations
issue_id = agent.create_issue("Test issue when server down")
assert issue_id is not None, "Should create issue without Agent Mail"
claimed = agent.claim_issue(issue_id)
assert claimed, "Should claim issue without Agent Mail"
completed = agent.complete_issue(issue_id)
assert completed, "Should complete issue without Agent Mail"
# Verify JSONL consistency
jsonl_check = verify_jsonl_consistency(workspace)
assert jsonl_check["valid"], f"JSONL should be valid: {jsonl_check['errors']}"
assert jsonl_check["issue_count"] == 1, "Should have 1 issue in JSONL"
test_elapsed = time.time() - test_start
print("✅ PASS: Agent worked correctly without server")
print(f" • Created, claimed, and completed issue: {issue_id}")
print(f" • JSONL valid with {jsonl_check['issue_count']} issue(s)")
print(f" • Test duration: {test_elapsed:.2f}s")
return True
finally:
shutil.rmtree(workspace, ignore_errors=True)
def test_server_crash_during_operation():
"""Test that agents handle server crash gracefully."""
print("\n" + "="*70)
print("TEST 2: Server Crashes During Operation")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-crash-")
server = MockAgentMailServer(failure_mode="crash_after_health")
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create agent
agent = TestAgent(workspace, "test-agent", mail_url=mail_url)
# Verify Agent Mail is initially enabled
assert agent.mail.enabled, "Agent Mail should be enabled initially"
# Create issue (triggers health check, count=1)
issue_id = agent.create_issue("Test issue before crash")
assert issue_id is not None, "Should create issue before crash"
# Server will crash on next request (count=2)
# Agent should handle gracefully and continue in Beads-only mode
claimed = agent.claim_issue(issue_id)
assert claimed, "Should claim issue even after server crash"
completed = agent.complete_issue(issue_id)
assert completed, "Should complete issue after server crash"
# Verify JSONL consistency
jsonl_check = verify_jsonl_consistency(workspace)
assert jsonl_check["valid"], f"JSONL should be valid: {jsonl_check['errors']}"
print("✅ PASS: Agent handled server crash gracefully")
print(f" • Server crashed after request #{server.request_count}")
print(f" • Agent continued in Beads-only mode")
print(f" • JSONL valid with {jsonl_check['issue_count']} issue(s)")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_network_partition_timeout():
"""Test that agents handle network timeouts without blocking indefinitely."""
print("\n" + "="*70)
print("TEST 3: Network Partition (Timeout)")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-timeout-")
server = MockAgentMailServer(failure_mode="timeout")
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server (will hang all requests)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Measure how long initialization takes (includes health check timeout)
init_start = time.time()
# Create agent with short timeout (2s set in TestAgent)
agent = TestAgent(workspace, "test-agent", mail_url=mail_url)
init_elapsed = time.time() - init_start
# Agent Mail should be disabled after health check timeout
# The health check itself will take ~2s to timeout
assert not agent.mail.enabled, "Agent Mail should be disabled (health check timeout)"
# Operations should proceed quickly in Beads-only mode (no more server calls)
ops_start = time.time()
issue_id = agent.create_issue("Test issue with timeout")
claimed = agent.claim_issue(issue_id)
ops_elapsed = time.time() - ops_start
# Operations should be fast (not waiting on server) - allow up to 15s for bd commands
assert ops_elapsed < 15, f"Operations took too long: {ops_elapsed:.2f}s (should be quick in Beads-only mode)"
assert issue_id is not None, "Should create issue despite timeout"
assert claimed, "Should claim issue despite timeout"
# Verify JSONL consistency
jsonl_check = verify_jsonl_consistency(workspace)
assert jsonl_check["valid"], f"JSONL should be valid: {jsonl_check['errors']}"
print("✅ PASS: Agent handled network timeout gracefully")
print(f" • Health check timeout: {init_elapsed:.2f}s")
print(f" • Operations completed in {ops_elapsed:.2f}s (Beads-only mode)")
print(f" • JSONL valid with {jsonl_check['issue_count']} issue(s)")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_server_500_errors():
"""Test that agents handle 500 errors gracefully."""
print("\n" + "="*70)
print("TEST 4: Server Returns 500 Errors")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-500-")
server = MockAgentMailServer(failure_mode="500_error")
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server (returns 500 for all requests)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create agent
agent = TestAgent(workspace, "test-agent", mail_url=mail_url)
# Agent Mail should be disabled (health check returns 500)
assert not agent.mail.enabled, "Agent Mail should be disabled (500 error)"
# Operations should work in Beads-only mode
issue_id = agent.create_issue("Test issue with 500 errors")
assert issue_id is not None, "Should create issue despite 500 errors"
claimed = agent.claim_issue(issue_id)
assert claimed, "Should claim issue despite 500 errors"
# Verify JSONL consistency
jsonl_check = verify_jsonl_consistency(workspace)
assert jsonl_check["valid"], f"JSONL should be valid: {jsonl_check['errors']}"
print("✅ PASS: Agent handled 500 errors gracefully")
print(f" • Server returned {server.request_count} 500 errors")
print(f" • JSONL valid with {jsonl_check['issue_count']} issue(s)")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_invalid_bearer_token():
"""Test that agents handle invalid bearer token (401) gracefully."""
print("\n" + "="*70)
print("TEST 5: Invalid Bearer Token (401)")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-token-")
server = MockAgentMailServer(failure_mode="invalid_token")
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server (requires "Bearer valid_token")
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create agent with invalid token
agent = TestAgent(workspace, "test-agent", mail_url=mail_url, mail_token="invalid_token")
# Note: The health check endpoint doesn't require auth in our mock server,
# so Agent Mail may be enabled initially. However, reservation requests
# will fail with 401, causing graceful degradation.
# This tests that the adapter handles auth failures during actual operations.
# Operations should work (graceful degradation on auth failure)
issue_id = agent.create_issue("Test issue with invalid token")
assert issue_id is not None, "Should create issue despite auth issues"
claimed = agent.claim_issue(issue_id)
assert claimed, "Should claim issue (reservation may fail but claim succeeds)"
# Verify JSONL consistency
jsonl_check = verify_jsonl_consistency(workspace)
assert jsonl_check["valid"], f"JSONL should be valid: {jsonl_check['errors']}"
print("✅ PASS: Agent handled invalid token gracefully")
print(f" • Server requests: {server.request_count}")
print(f" • Agent Mail enabled: {agent.mail.enabled}")
print(f" • Operations succeeded via graceful degradation")
print(f" • JSONL valid with {jsonl_check['issue_count']} issue(s)")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_malformed_json_response():
"""Test that agents handle malformed JSON responses gracefully."""
print("\n" + "="*70)
print("TEST 6: Malformed JSON Response")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-badjson-")
server = MockAgentMailServer(failure_mode="invalid_json")
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server (returns malformed JSON)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create agent
agent = TestAgent(workspace, "test-agent", mail_url=mail_url)
# Agent Mail should be disabled (malformed health check response)
assert not agent.mail.enabled, "Agent Mail should be disabled (invalid JSON)"
# Operations should work in Beads-only mode
issue_id = agent.create_issue("Test issue with malformed JSON")
assert issue_id is not None, "Should create issue despite malformed JSON"
claimed = agent.claim_issue(issue_id)
assert claimed, "Should claim issue despite malformed JSON"
# Verify JSONL consistency
jsonl_check = verify_jsonl_consistency(workspace)
assert jsonl_check["valid"], f"JSONL should be valid: {jsonl_check['errors']}"
print("✅ PASS: Agent handled malformed JSON gracefully")
print(f" • JSONL valid with {jsonl_check['issue_count']} issue(s)")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_jsonl_consistency_under_failures():
"""Test JSONL remains consistent across multiple failure scenarios."""
print("\n" + "="*70)
print("TEST 7: JSONL Consistency Under Multiple Failures")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-consistency-")
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Scenario 1: No server
agent1 = TestAgent(workspace, "agent1", mail_url="http://127.0.0.1:9999")
id1 = agent1.create_issue("Issue 1 - no server")
agent1.claim_issue(id1)
# Scenario 2: Server crash
server2 = MockAgentMailServer(failure_mode="crash_after_health")
port2 = server2.start()
agent2 = TestAgent(workspace, "agent2", mail_url=f"http://127.0.0.1:{port2}")
id2 = agent2.create_issue("Issue 2 - server crash")
agent2.claim_issue(id2) # Triggers crash
server2.stop()
# Scenario 3: 500 errors
server3 = MockAgentMailServer(failure_mode="500_error")
port3 = server3.start()
agent3 = TestAgent(workspace, "agent3", mail_url=f"http://127.0.0.1:{port3}")
id3 = agent3.create_issue("Issue 3 - 500 errors")
agent3.claim_issue(id3)
server3.stop()
# Verify JSONL is still consistent
jsonl_check = verify_jsonl_consistency(workspace)
assert jsonl_check["valid"], f"JSONL should be valid: {jsonl_check['errors']}"
assert jsonl_check["issue_count"] == 3, f"Expected 3 issues, got {jsonl_check['issue_count']}"
# Verify we can still read issues with bd
result = subprocess.run(
["bd", "list", "--json"],
cwd=workspace,
capture_output=True,
text=True,
check=True
)
issues = json.loads(result.stdout)
assert len(issues) == 3, f"Expected 3 issues from bd list, got {len(issues)}"
print("✅ PASS: JSONL remained consistent across all failure scenarios")
print(f" • Created 3 issues across 3 different failure modes")
print(f" • JSONL valid with {jsonl_check['issue_count']} issues")
print(f" • All issues readable via bd CLI")
return True
finally:
shutil.rmtree(workspace, ignore_errors=True)
def main():
"""Run all failure scenario tests."""
print("🧪 Agent Mail Server Failure Scenarios Test Suite")
print("Testing graceful degradation across various failure modes")
# Check if bd is available
try:
subprocess.run(["bd", "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("❌ ERROR: bd command not found")
print(" Install: go install github.com/steveyegge/beads/cmd/bd@latest")
sys.exit(1)
# Run tests
tests = [
("Server never started", test_server_never_started),
("Server crash during operation", test_server_crash_during_operation),
("Network partition timeout", test_network_partition_timeout),
("Server 500 errors", test_server_500_errors),
("Invalid bearer token", test_invalid_bearer_token),
("Malformed JSON response", test_malformed_json_response),
("JSONL consistency under failures", test_jsonl_consistency_under_failures),
]
passed = 0
failed = 0
start_time = time.time()
for name, test_func in tests:
try:
if test_func():
passed += 1
except AssertionError as e:
print(f"\n❌ FAIL: {name}")
print(f" {e}")
failed += 1
except Exception as e:
print(f"\n💥 ERROR in {name}: {e}")
import traceback
traceback.print_exc()
failed += 1
elapsed = time.time() - start_time
# Summary
print("\n" + "="*70)
print("SUMMARY")
print("="*70)
print(f"✅ Passed: {passed}/{len(tests)}")
print(f"❌ Failed: {failed}/{len(tests)}")
print(f"⏱️ Total time: {elapsed:.2f}s")
if failed == 0:
print("\n🎉 All failure scenario tests passed!")
print(" Agents gracefully degrade to Beads-only mode in all failure cases")
sys.exit(0)
else:
print(f"\n⚠️ {failed} test(s) failed")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,510 +0,0 @@
#!/usr/bin/env python3
"""
Multi-Agent Coordination Test Suite
Fast tests (<30s total) covering critical multi-agent scenarios:
- Fairness: N agents claiming M issues
- Notifications: End-to-end message passing
- Handoff: Release → immediate claim by another agent
- Idempotency: Double operations by same agent
"""
import json
import subprocess
import tempfile
import shutil
import sys
import time
from pathlib import Path
from multiprocessing import Process, Queue
from threading import Thread, Lock
from http.server import HTTPServer, BaseHTTPRequestHandler
import socket
# Add lib directory for beads_mail_adapter
lib_path = Path(__file__).parent.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
from beads_mail_adapter import AgentMailAdapter
class MockAgentMailServer:
"""Lightweight mock server with reservations and notifications."""
def __init__(self, port: int = 0):
self.port = port
self.server = None
self.thread = None
self.reservations = {} # file_path -> agent_name
self.notifications = {} # agent_name -> [messages]
self.lock = Lock()
def start(self) -> int:
"""Start server and return port."""
handler = self._create_handler()
if self.port == 0:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.listen(1)
self.port = s.getsockname()[1]
self.server = HTTPServer(('127.0.0.1', self.port), handler)
self.thread = Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
time.sleep(0.1)
return self.port
def stop(self):
if self.server:
self.server.shutdown()
self.server.server_close()
def _create_handler(self):
parent = self
class Handler(BaseHTTPRequestHandler):
def log_message(self, *args):
pass
def do_GET(self):
if self.path == "/api/health":
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "ok"}')
# Get inbox: /api/notifications/{agent_name}
elif self.path.startswith("/api/notifications/"):
agent_name = self.path.split('/')[-1]
with parent.lock:
messages = parent.notifications.get(agent_name, [])
parent.notifications[agent_name] = [] # Clear after read
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(messages).encode())
elif self.path == "/api/reservations":
with parent.lock:
res_list = [
{"file_path": fp, "agent_name": agent}
for fp, agent in parent.reservations.items()
]
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(res_list).encode())
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length) if content_length > 0 else b'{}'
data = json.loads(body.decode('utf-8'))
# Reserve: /api/reservations
if self.path == "/api/reservations":
file_path = data.get("file_path")
agent_name = data.get("agent_name")
with parent.lock:
if file_path in parent.reservations:
existing = parent.reservations[file_path]
if existing != agent_name:
# Conflict
self.send_response(409)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({
"error": f"Already reserved by {existing}"
}).encode())
return
# else: same agent re-reserving (idempotent)
parent.reservations[file_path] = agent_name
self.send_response(201)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "reserved"}')
# Notify: /api/notifications
elif self.path == "/api/notifications":
from_agent = data.get("from_agent")
event_type = data.get("event_type")
payload = data.get("payload", {})
# Broadcast to all OTHER agents
with parent.lock:
for agent_name in list(parent.notifications.keys()):
if agent_name != from_agent:
parent.notifications[agent_name].append({
"from": from_agent,
"event": event_type,
"data": payload
})
# If target agent specified, ensure they get it
to_agent = payload.get("to_agent")
if to_agent and to_agent not in parent.notifications:
parent.notifications[to_agent] = [{
"from": from_agent,
"event": event_type,
"data": payload
}]
self.send_response(201)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "sent"}')
else:
self.send_response(404)
self.end_headers()
def do_DELETE(self):
# Release: /api/reservations/{agent}/{issue_id}
parts = self.path.split('/')
if len(parts) >= 5:
agent_name = parts[3]
issue_id = parts[4]
file_path = f".beads/issues/{issue_id}"
with parent.lock:
if file_path in parent.reservations:
if parent.reservations[file_path] == agent_name:
del parent.reservations[file_path]
self.send_response(204)
self.end_headers()
else:
self.send_response(404)
self.end_headers()
return Handler
class TestAgent:
"""Minimal test agent."""
def __init__(self, workspace: str, agent_name: str, mail_url: str):
self.workspace = workspace
self.agent_name = agent_name
self.mail = AgentMailAdapter(url=mail_url, agent_name=agent_name, timeout=2)
def run_bd(self, *args):
cmd = ["bd", "--no-daemon"] + list(args) + ["--json"]
result = subprocess.run(cmd, cwd=self.workspace, capture_output=True, text=True)
if result.returncode != 0:
return {"error": result.stderr}
if result.stdout.strip():
try:
return json.loads(result.stdout)
except:
return {"error": "Invalid JSON"}
return {}
def create_issue(self, title: str) -> str:
result = self.run_bd("create", title, "-p", "1")
return result.get("id")
def claim_issue(self, issue_id: str) -> bool:
if self.mail.enabled and not self.mail.reserve_issue(issue_id):
return False
result = self.run_bd("update", issue_id, "--status", "in_progress")
return "error" not in result
def release_issue(self, issue_id: str):
if self.mail.enabled:
self.mail.release_issue(issue_id)
def agent_claim_worker(agent_name: str, workspace: str, issue_id: str,
mail_url: str, result_queue: Queue):
"""Worker that tries to claim a single issue."""
try:
agent = TestAgent(workspace, agent_name, mail_url)
success = agent.claim_issue(issue_id)
result_queue.put({"agent": agent_name, "issue": issue_id, "success": success})
except Exception as e:
result_queue.put({"agent": agent_name, "issue": issue_id, "success": False, "error": str(e)})
def test_fairness_n_agents_m_issues():
"""Test that N agents competing for M issues results in exactly M claims."""
print("\n" + "="*70)
print("TEST 1: Fairness - 10 agents, 5 issues")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-fairness-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create 5 issues
agent = TestAgent(workspace, "setup", mail_url)
issues = [agent.create_issue(f"Issue {i+1}") for i in range(5)]
# Spawn 10 agents trying to claim all 5 issues
result_queue = Queue()
processes = []
for agent_num in range(10):
for issue_id in issues:
p = Process(target=agent_claim_worker,
args=(f"agent-{agent_num}", workspace, issue_id, mail_url, result_queue))
processes.append(p)
# Start all at once
for p in processes:
p.start()
for p in processes:
p.join(timeout=10)
# Collect results
results = []
while not result_queue.empty():
results.append(result_queue.get())
# Count successful claims per issue
claims_per_issue = {}
for r in results:
if r["success"]:
issue = r["issue"]
claims_per_issue[issue] = claims_per_issue.get(issue, 0) + 1
print(f" • Total attempts: {len(results)}")
print(f" • Successful claims: {sum(claims_per_issue.values())}")
print(f" • Claims per issue: {claims_per_issue}")
# Verify exactly 1 claim per issue
for issue_id in issues:
claims = claims_per_issue.get(issue_id, 0)
assert claims == 1, f"Issue {issue_id} claimed {claims} times (expected 1)"
print("✅ PASS: Each issue claimed exactly once")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_notification_end_to_end():
"""Test notifications from agent1 to agent2."""
print("\n" + "="*70)
print("TEST 2: Notification End-to-End")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-notify-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create two agents
agent1 = TestAgent(workspace, "agent1", mail_url)
agent2 = TestAgent(workspace, "agent2", mail_url)
# Register agent2's inbox
server.notifications["agent2"] = []
# Agent1 sends notification
sent = agent1.mail.notify("task_completed", {
"issue_id": "bd-123",
"status": "done",
"to_agent": "agent2"
})
assert sent, "Should send notification"
# Agent2 checks inbox
messages = agent2.mail.check_inbox()
print(f" • Agent1 sent notification")
print(f" • Agent2 received {len(messages)} message(s)")
assert len(messages) == 1, f"Expected 1 message, got {len(messages)}"
assert messages[0]["from"] == "agent1"
assert messages[0]["event"] == "task_completed"
assert messages[0]["data"]["issue_id"] == "bd-123"
# Second check should be empty (messages consumed)
messages2 = agent2.mail.check_inbox()
assert len(messages2) == 0, "Inbox should be empty after read"
print("✅ PASS: Notification delivered correctly")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_reservation_handoff():
"""Test immediate claim after release (handoff scenario)."""
print("\n" + "="*70)
print("TEST 3: Reservation Handoff")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-handoff-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
agent1 = TestAgent(workspace, "agent1", mail_url)
agent2 = TestAgent(workspace, "agent2", mail_url)
# Agent1 creates and claims issue
issue_id = agent1.create_issue("Handoff test")
claimed1 = agent1.claim_issue(issue_id)
assert claimed1, "Agent1 should claim issue"
# Agent2 tries to claim (should fail - reserved)
claimed2_before = agent2.claim_issue(issue_id)
assert not claimed2_before, "Agent2 should be blocked"
# Agent1 releases
agent1.release_issue(issue_id)
# Agent2 immediately claims (handoff)
claimed2_after = agent2.claim_issue(issue_id)
assert claimed2_after, "Agent2 should claim after release"
# Verify reservation ownership
reservations = agent2.mail.get_reservations()
assert len(reservations) == 1
assert reservations[0]["agent_name"] == "agent2"
print("✅ PASS: Clean handoff from agent1 to agent2")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_idempotent_operations():
"""Test double reserve and double release by same agent."""
print("\n" + "="*70)
print("TEST 4: Idempotent Operations")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-idem-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
agent = TestAgent(workspace, "agent1", mail_url)
issue_id = agent.create_issue("Idempotency test")
# Reserve twice (idempotent)
reserve1 = agent.mail.reserve_issue(issue_id)
reserve2 = agent.mail.reserve_issue(issue_id)
assert reserve1, "First reserve should succeed"
assert reserve2, "Second reserve should be idempotent (same agent)"
# Verify only one reservation
reservations = agent.mail.get_reservations()
assert len(reservations) == 1, f"Should have 1 reservation, got {len(reservations)}"
# Release twice (idempotent)
release1 = agent.mail.release_issue(issue_id)
release2 = agent.mail.release_issue(issue_id)
assert release1, "First release should succeed"
assert release2, "Second release should be idempotent (no error)"
# Verify no reservations
reservations_after = agent.mail.get_reservations()
assert len(reservations_after) == 0, "Should have 0 reservations after release"
print("✅ PASS: Double reserve and release are idempotent")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def main():
"""Run coordination tests."""
print("🧪 Multi-Agent Coordination Test Suite")
print("Fast tests for critical coordination scenarios")
try:
subprocess.run(["bd", "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("❌ ERROR: bd command not found")
sys.exit(1)
tests = [
("Fairness (10 agents, 5 issues)", test_fairness_n_agents_m_issues),
("Notification end-to-end", test_notification_end_to_end),
("Reservation handoff", test_reservation_handoff),
("Idempotent operations", test_idempotent_operations),
]
passed = 0
failed = 0
start_time = time.time()
for name, test_func in tests:
try:
if test_func():
passed += 1
except AssertionError as e:
print(f"\n❌ FAIL: {name}")
print(f" {e}")
failed += 1
except Exception as e:
print(f"\n💥 ERROR in {name}: {e}")
import traceback
traceback.print_exc()
failed += 1
elapsed = time.time() - start_time
print("\n" + "="*70)
print("SUMMARY")
print("="*70)
print(f"✅ Passed: {passed}/{len(tests)}")
print(f"❌ Failed: {failed}/{len(tests)}")
print(f"⏱️ Total time: {elapsed:.1f}s")
if failed == 0:
print("\n🎉 All coordination tests passed!")
sys.exit(0)
else:
print(f"\n⚠️ {failed} test(s) failed")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,635 +0,0 @@
#!/usr/bin/env python3
"""
Reservation TTL and Expiration Test Suite
Tests verify time-based reservation behavior:
- Short TTL reservations (30s)
- Reservation blocking verification
- Auto-release after expiration
- Renewal/heartbeat mechanisms
Performance notes:
- Uses 30s TTL for expiration tests (fast enough for CI)
- Uses mock HTTP server with minimal overhead
- Each test ~30-60s (waiting for expiration)
"""
import json
import subprocess
import tempfile
import shutil
import os
import sys
import time
import logging
from pathlib import Path
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread, Lock
from typing import Optional, Dict, Any, List
import socket
from datetime import datetime, timedelta
# Add lib directory for beads_mail_adapter
lib_path = Path(__file__).parent.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
from beads_mail_adapter import AgentMailAdapter
# Configure logging
logging.basicConfig(
level=logging.WARNING,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Test configuration
TEST_TIMEOUT = 2 # HTTP timeout
SHORT_TTL = 30 # Short TTL for expiration tests (30 seconds)
class Reservation:
"""Represents a file reservation with TTL."""
def __init__(self, file_path: str, agent_name: str, ttl: int):
self.file_path = file_path
self.agent_name = agent_name
self.expires_at = datetime.now() + timedelta(seconds=ttl)
self.created_at = datetime.now()
def is_expired(self) -> bool:
"""Check if reservation has expired."""
return datetime.now() >= self.expires_at
def renew(self, ttl: int) -> None:
"""Renew reservation with new TTL."""
self.expires_at = datetime.now() + timedelta(seconds=ttl)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"file_path": self.file_path,
"agent_name": self.agent_name,
"expires_at": self.expires_at.isoformat(),
"created_at": self.created_at.isoformat()
}
class MockAgentMailServer:
"""Mock Agent Mail server with TTL-based reservation management."""
def __init__(self, port: int = 0):
self.port = port
self.server: Optional[HTTPServer] = None
self.thread: Optional[Thread] = None
self.reservations: Dict[str, Reservation] = {} # file_path -> Reservation
self.lock = Lock() # Thread-safe access to reservations
self.request_count = 0
def start(self) -> int:
"""Start the mock server. Returns actual port number."""
handler_class = self._create_handler()
# Find available port if port=0
if self.port == 0:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.listen(1)
self.port = s.getsockname()[1]
self.server = HTTPServer(('127.0.0.1', self.port), handler_class)
self.thread = Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
# Wait for server to be ready
time.sleep(0.1)
logger.info(f"Mock Agent Mail server started on port {self.port}")
return self.port
def stop(self):
"""Stop the mock server."""
if self.server:
self.server.shutdown()
self.server.server_close()
logger.info(f"Mock Agent Mail server stopped")
def _cleanup_expired(self) -> None:
"""Remove expired reservations."""
with self.lock:
expired = [path for path, res in self.reservations.items() if res.is_expired()]
for path in expired:
del self.reservations[path]
logger.debug(f"Auto-released expired reservation: {path}")
def _create_handler(self):
"""Create request handler class with access to server state."""
parent = self
class MockHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""Suppress default logging."""
pass
def do_GET(self):
parent.request_count += 1
parent._cleanup_expired() # Clean up expired reservations
# Health check
if self.path == "/api/health":
response = {"status": "ok"}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
# Get all reservations
elif self.path == "/api/reservations":
with parent.lock:
reservations = [res.to_dict() for res in parent.reservations.values()]
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"reservations": reservations}).encode())
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
parent.request_count += 1
parent._cleanup_expired() # Clean up expired reservations
# Read request body
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length) if content_length > 0 else b'{}'
try:
data = json.loads(body.decode('utf-8'))
except json.JSONDecodeError:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"error": "Invalid JSON"}).encode())
return
# Create/renew reservation
if self.path == "/api/reservations":
file_path = data.get("file_path")
agent_name = data.get("agent_name")
ttl = data.get("ttl", 3600)
if not file_path or not agent_name:
self.send_response(400)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"error": "Missing file_path or agent_name"}).encode())
return
with parent.lock:
# Check if already reserved by another agent
if file_path in parent.reservations:
existing = parent.reservations[file_path]
if existing.agent_name != agent_name:
# Conflict: already reserved by another agent
self.send_response(409)
self.send_header('Content-Type', 'application/json')
self.end_headers()
error_msg = f"File already reserved by {existing.agent_name}"
self.wfile.write(json.dumps({"error": error_msg}).encode())
return
else:
# Renewal: same agent re-reserving (heartbeat)
existing.renew(ttl)
logger.debug(f"Renewed reservation: {file_path} by {agent_name}")
else:
# New reservation
parent.reservations[file_path] = Reservation(file_path, agent_name, ttl)
logger.debug(f"Created reservation: {file_path} by {agent_name} (TTL={ttl}s)")
self.send_response(201)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({"status": "reserved"}).encode())
else:
self.send_response(404)
self.end_headers()
def do_DELETE(self):
parent.request_count += 1
parent._cleanup_expired() # Clean up expired reservations
# Release reservation: /api/reservations/{agent}/{issue_id}
# Extract file_path from URL (last component is issue_id)
parts = self.path.split('/')
if len(parts) >= 5 and parts[1] == "api" and parts[2] == "reservations":
agent_name = parts[3]
issue_id = parts[4]
file_path = f".beads/issues/{issue_id}"
with parent.lock:
if file_path in parent.reservations:
res = parent.reservations[file_path]
if res.agent_name == agent_name:
del parent.reservations[file_path]
logger.debug(f"Released reservation: {file_path}")
self.send_response(204)
self.end_headers()
else:
self.send_response(404)
self.end_headers()
return MockHandler
class TestAgent:
"""Test agent that performs bd operations with reservation support."""
def __init__(self, workspace: str, agent_name: str = "test-agent",
mail_url: Optional[str] = None):
self.workspace = workspace
self.agent_name = agent_name
self.mail_url = mail_url
# Initialize adapter if URL provided
if mail_url:
self.mail = AgentMailAdapter(
url=mail_url,
agent_name=agent_name,
timeout=TEST_TIMEOUT
)
else:
self.mail = None
def run_bd(self, *args) -> dict:
"""Run bd command and return JSON output."""
cmd = ["bd", "--no-daemon"] + list(args) + ["--json"]
result = subprocess.run(
cmd,
cwd=self.workspace,
capture_output=True,
text=True
)
if result.returncode != 0:
return {"error": result.stderr}
if result.stdout.strip():
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {"error": "Invalid JSON", "output": result.stdout}
return {}
def create_issue(self, title: str, priority: int = 1) -> Optional[str]:
"""Create an issue and return its ID."""
result = self.run_bd("create", title, "-p", str(priority))
if "error" in result:
logger.error(f"Failed to create issue: {result['error']}")
return None
return result.get("id")
def claim_issue(self, issue_id: str, ttl: int = 3600) -> bool:
"""Attempt to claim an issue with optional reservation."""
# Try to reserve if Agent Mail is enabled
if self.mail and self.mail.enabled:
reserved = self.mail.reserve_issue(issue_id, ttl=ttl)
if not reserved:
logger.warning(f"Failed to reserve {issue_id}")
return False
# Update status
result = self.run_bd("update", issue_id, "--status", "in_progress")
if "error" in result:
logger.error(f"Failed to claim {issue_id}: {result['error']}")
if self.mail and self.mail.enabled:
self.mail.release_issue(issue_id)
return False
return True
def renew_reservation(self, issue_id: str, ttl: int = 3600) -> bool:
"""Renew reservation (heartbeat)."""
if self.mail and self.mail.enabled:
# Re-reserving with same agent acts as renewal
return self.mail.reserve_issue(issue_id, ttl=ttl)
return True
def test_short_ttl_reservation():
"""Test reservation with short TTL (30s)."""
print("\n" + "="*70)
print("TEST 1: Short TTL Reservation (30s)")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-ttl-")
server = MockAgentMailServer()
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create agent
agent = TestAgent(workspace, "test-agent", mail_url=mail_url)
# Create and claim issue with short TTL
issue_id = agent.create_issue("Test short TTL reservation")
assert issue_id is not None, "Should create issue"
start_time = time.time()
claimed = agent.claim_issue(issue_id, ttl=SHORT_TTL)
assert claimed, f"Should claim issue with {SHORT_TTL}s TTL"
# Verify reservation exists
reservations = agent.mail.get_reservations()
assert len(reservations) == 1, f"Should have 1 reservation, got {len(reservations)}"
assert reservations[0]["agent_name"] == "test-agent", "Reservation should be owned by test-agent"
# Check TTL info
res = reservations[0]
expires_at = datetime.fromisoformat(res["expires_at"])
created_at = datetime.fromisoformat(res["created_at"])
actual_ttl = (expires_at - created_at).total_seconds()
print(f"✅ PASS: Created reservation with {SHORT_TTL}s TTL")
print(f" • Issue: {issue_id}")
print(f" • Actual TTL: {actual_ttl:.1f}s")
print(f" • Expires at: {expires_at.strftime('%H:%M:%S')}")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_reservation_blocking():
"""Test that reservation blocks other agents from claiming."""
print("\n" + "="*70)
print("TEST 2: Reservation Blocking Verification")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-block-")
server = MockAgentMailServer()
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create two agents
agent1 = TestAgent(workspace, "agent1", mail_url=mail_url)
agent2 = TestAgent(workspace, "agent2", mail_url=mail_url)
# Agent 1 creates and claims issue
issue_id = agent1.create_issue("Test reservation blocking")
assert issue_id is not None, "Agent 1 should create issue"
claimed1 = agent1.claim_issue(issue_id, ttl=SHORT_TTL)
assert claimed1, "Agent 1 should claim issue"
# Agent 2 attempts to claim same issue (should fail)
claimed2 = agent2.claim_issue(issue_id, ttl=SHORT_TTL)
assert not claimed2, "Agent 2 should NOT be able to claim (blocked by reservation)"
# Verify only one reservation exists
reservations = agent1.mail.get_reservations()
assert len(reservations) == 1, f"Should have 1 reservation, got {len(reservations)}"
assert reservations[0]["agent_name"] == "agent1", "Reservation should be owned by agent1"
print("✅ PASS: Reservation successfully blocked other agent")
print(f" • Agent 1 claimed: {issue_id}")
print(f" • Agent 2 blocked by reservation")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_auto_release_after_expiration():
"""Test that reservation auto-releases after TTL expires."""
print("\n" + "="*70)
print("TEST 3: Auto-Release After Expiration")
print("="*70)
print(f" (This test waits {SHORT_TTL}s for expiration)")
workspace = tempfile.mkdtemp(prefix="bd-test-expire-")
server = MockAgentMailServer()
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create two agents
agent1 = TestAgent(workspace, "agent1", mail_url=mail_url)
agent2 = TestAgent(workspace, "agent2", mail_url=mail_url)
# Agent 1 creates and claims issue with short TTL
issue_id = agent1.create_issue("Test auto-release")
assert issue_id is not None, "Agent 1 should create issue"
start_time = time.time()
claimed1 = agent1.claim_issue(issue_id, ttl=SHORT_TTL)
assert claimed1, "Agent 1 should claim issue"
# Verify reservation exists
reservations = agent1.mail.get_reservations()
assert len(reservations) == 1, "Should have 1 active reservation"
# Agent 2 attempts to claim (should fail - still reserved)
claimed2_before = agent2.claim_issue(issue_id, ttl=SHORT_TTL)
assert not claimed2_before, "Agent 2 should be blocked before expiration"
print(f" • Waiting {SHORT_TTL}s for reservation to expire...")
# Wait for TTL to expire (add 2s buffer for clock skew)
time.sleep(SHORT_TTL + 2)
elapsed = time.time() - start_time
# Verify reservation auto-released (next request cleans up expired)
reservations_after = agent2.mail.get_reservations() # Triggers cleanup
assert len(reservations_after) == 0, f"Reservation should have expired, got {len(reservations_after)}"
# Agent 2 should now be able to claim
claimed2_after = agent2.claim_issue(issue_id, ttl=SHORT_TTL)
assert claimed2_after, "Agent 2 should claim issue after expiration"
# Verify new reservation by agent2
final_reservations = agent2.mail.get_reservations()
assert len(final_reservations) == 1, "Should have 1 reservation after agent2 claims"
assert final_reservations[0]["agent_name"] == "agent2", "Reservation should be owned by agent2"
print(f"✅ PASS: Reservation auto-released after {elapsed:.1f}s")
print(f" • Agent 1 reservation expired")
print(f" • Agent 2 successfully claimed after expiration")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_renewal_heartbeat():
"""Test reservation renewal (heartbeat mechanism)."""
print("\n" + "="*70)
print("TEST 4: Renewal/Heartbeat Mechanism")
print("="*70)
print(f" (This test waits {SHORT_TTL // 2}s to test renewal)")
workspace = tempfile.mkdtemp(prefix="bd-test-renew-")
server = MockAgentMailServer()
try:
# Initialize workspace
subprocess.run(
["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace,
check=True,
capture_output=True
)
# Start server
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create agent
agent = TestAgent(workspace, "test-agent", mail_url=mail_url)
# Create and claim issue with short TTL
issue_id = agent.create_issue("Test renewal/heartbeat")
assert issue_id is not None, "Should create issue"
claimed = agent.claim_issue(issue_id, ttl=SHORT_TTL)
assert claimed, f"Should claim issue with {SHORT_TTL}s TTL"
# Get initial expiration time
reservations = agent.mail.get_reservations()
assert len(reservations) == 1, "Should have 1 reservation"
initial_expires = datetime.fromisoformat(reservations[0]["expires_at"])
print(f" • Initial expiration: {initial_expires.strftime('%H:%M:%S')}")
print(f" • Waiting {SHORT_TTL // 2}s before renewal...")
# Wait halfway through TTL
time.sleep(SHORT_TTL // 2)
# Renew reservation (heartbeat)
renewed = agent.renew_reservation(issue_id, ttl=SHORT_TTL)
assert renewed, "Should renew reservation"
# Get new expiration time
reservations_after = agent.mail.get_reservations()
assert len(reservations_after) == 1, "Should still have 1 reservation"
renewed_expires = datetime.fromisoformat(reservations_after[0]["expires_at"])
# Verify expiration was extended
extension = (renewed_expires - initial_expires).total_seconds()
print(f" • Renewed expiration: {renewed_expires.strftime('%H:%M:%S')}")
print(f" • Extension: {extension:.1f}s")
# Extension should be approximately TTL/2 (since we renewed halfway)
# Allow 5s tolerance for clock skew and processing time
expected_extension = SHORT_TTL // 2
assert abs(extension - expected_extension) < 5, \
f"Extension should be ~{expected_extension}s, got {extension:.1f}s"
print(f"✅ PASS: Reservation renewed successfully")
print(f" • Heartbeat extended expiration by {extension:.1f}s")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def main():
"""Run all TTL/expiration tests."""
print("🧪 Reservation TTL and Expiration Test Suite")
print(f"Testing time-based reservation behavior (SHORT_TTL={SHORT_TTL}s)")
# Check if bd is available
try:
subprocess.run(["bd", "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("❌ ERROR: bd command not found")
print(" Install: go install github.com/steveyegge/beads/cmd/bd@latest")
sys.exit(1)
# Run tests
tests = [
("Short TTL reservation", test_short_ttl_reservation),
("Reservation blocking", test_reservation_blocking),
("Auto-release after expiration", test_auto_release_after_expiration),
("Renewal/heartbeat mechanism", test_renewal_heartbeat),
]
passed = 0
failed = 0
start_time = time.time()
for name, test_func in tests:
try:
if test_func():
passed += 1
except AssertionError as e:
print(f"\n❌ FAIL: {name}")
print(f" {e}")
failed += 1
except Exception as e:
print(f"\n💥 ERROR in {name}: {e}")
import traceback
traceback.print_exc()
failed += 1
elapsed = time.time() - start_time
# Summary
print("\n" + "="*70)
print("SUMMARY")
print("="*70)
print(f"✅ Passed: {passed}/{len(tests)}")
print(f"❌ Failed: {failed}/{len(tests)}")
print(f"⏱️ Total time: {elapsed:.1f}s")
if failed == 0:
print("\n🎉 All TTL/expiration tests passed!")
print(" Reservation expiration and renewal work correctly")
sys.exit(0)
else:
print(f"\n⚠️ {failed} test(s) failed")
sys.exit(1)
if __name__ == "__main__":
main()