Add comprehensive Agent Mail coordination tests (bd-pdjb)

- Created test_multi_agent_coordination.py with 4 fast tests (<11s total)
- Tests cover fairness (10 agents, 5 issues), notifications, handoff, idempotency
- Documented complete test coverage in AGENT_MAIL_TEST_COVERAGE.md
- 66 total tests across 5 files validating multi-agent reliability
- Closed bd-pdjb (Testing & Validation epic)
This commit is contained in:
Steve Yegge
2025-11-08 02:48:05 -08:00
parent 08ae95be95
commit bc056cebcd
3 changed files with 988 additions and 285 deletions

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,196 @@
# Agent Mail Integration Test Coverage
## Test Suite Summary
**Total test time**: ~55 seconds (all suites)
**Total tests**: 66 tests across 5 files
## Coverage by Category
### 1. HTTP Adapter Unit Tests (`lib/test_beads_mail_adapter.py`)
**51 tests in 0.019s**
**Enabled/Disabled Mode**
- Server available vs unavailable
- Graceful degradation when server dies mid-operation
- Operations no-op when disabled
**Reservation Operations**
- Successful reservation (201)
- Conflict handling (409)
- Custom TTL support
- Multiple reservations by same agent
- Release operations (204)
- Double release idempotency
**HTTP Error Handling**
- 500 Internal Server Error
- 404 Not Found
- 409 Conflict with malformed body
- Network timeouts
- Malformed JSON responses
- Empty response bodies (204 No Content)
**Configuration**
- Environment variable configuration
- Constructor parameter overrides
- URL normalization (trailing slash removal)
- Default agent name from hostname
- Timeout configuration
**Authorization**
- Bearer token headers
- Missing token behavior
- Content-Type headers
**Request Validation**
- Body structure for reservations
- Body structure for notifications
- URL structure for releases
- URL structure for inbox checks
**Inbox & Notifications**
- Send notifications
- Check inbox with messages
- Empty inbox handling
- Dict wrapper responses
- Large message lists (100 messages)
- Nested payload data
- Empty and large payloads
- Unicode handling
### 2. Multi-Agent Race Conditions (`tests/integration/test_agent_race.py`)
**3 tests in ~15s**
**Collision Prevention**
- 3 agents competing for 1 issue (WITH Agent Mail)
- Only one winner with reservations
- Multiple agents without Agent Mail (collision demo)
**Stress Testing**
- 10 agents competing for 1 issue
- Exactly one winner guaranteed
- JSONL consistency verification
### 3. Server Failure Scenarios (`tests/integration/test_mail_failures.py`)
**7 tests in ~20s**
**Failure Modes**
- Server never started (connection refused)
- Server crash during operation
- Network partition (timeout)
- Server 500 errors
- Invalid bearer token (401)
- Malformed JSON responses
**Graceful Degradation**
- Agents continue working in Beads-only mode
- JSONL remains consistent across failures
- No crashes or data loss
### 4. Reservation TTL & Expiration (`tests/integration/test_reservation_ttl.py`)
**4 tests in ~60s** (includes 30s waits for expiration)
**Time-Based Behavior**
- Short TTL reservations (30s)
- Reservation blocking verification
- Auto-release after expiration
- Renewal/heartbeat mechanisms
### 5. Multi-Agent Coordination (`tests/integration/test_multi_agent_coordination.py`)
**4 tests in ~11s** ⭐ NEW
**Fairness**
- 10 agents competing for 5 issues
- Each issue claimed exactly once
- No duplicate claims in JSONL
**Notifications**
- End-to-end message delivery
- Inbox consumption (messages cleared after read)
- Message structure validation
**Handoff Scenarios**
- Agent releases, another immediately claims
- Clean reservation ownership transfer
**Idempotency**
- Double reserve by same agent (safe)
- Double release by same agent (safe)
- Reservation count verification
## Coverage Gaps (Intentionally Not Tested)
### Low-Priority Edge Cases
- **Path traversal in issue IDs**: Issue IDs are validated elsewhere in bd
- **429 Retry-After logic**: Nice-to-have, not critical for v1
- **HTTPS/TLS verification**: Out of scope for integration layer
- **Re-enable after recovery**: Complex, requires persistent health checking
- **Token rotation mid-run**: Rare scenario, not worth complexity
- **Slow tests**: 50+ agent stress tests, soak tests, inbox flood (>10k messages)
### Why Skipped
These scenarios are either:
1. **Validated elsewhere** (e.g., issue ID validation in bd core)
2. **Low probability** (e.g., token rotation during agent run)
3. **Nice-to-have features** (e.g., automatic re-enable, retry policies)
4. **Too slow for CI** (e.g., multi-hour soak tests, 50-agent races)
## Test Execution
### Run All Tests
```bash
# Unit tests (fast, 0.02s)
python3 lib/test_beads_mail_adapter.py
# Multi-agent coordination (11s)
python3 tests/integration/test_multi_agent_coordination.py
# Race conditions (15s, requires Agent Mail server or falls back)
python3 tests/integration/test_agent_race.py
# Failure scenarios (20s)
python3 tests/integration/test_mail_failures.py
# TTL/expiration (60s - includes deliberate waits)
python3 tests/integration/test_reservation_ttl.py
```
### Quick Validation (No Slow Tests)
```bash
python3 lib/test_beads_mail_adapter.py
python3 tests/integration/test_multi_agent_coordination.py
python3 tests/integration/test_mail_failures.py
# Total: ~31s
```
## Assertions Verified
**Correctness**
- Only one agent claims each issue (collision prevention)
- Notifications deliver correctly
- Reservations block other agents
- JSONL remains consistent across all failure modes
**Reliability**
- Graceful degradation when server unavailable
- Idempotent operations don't corrupt state
- Expired reservations auto-release
- Handoffs work cleanly
**Performance**
- Fast timeout detection (1-2s)
- No blocking on server failures
- Tests complete in reasonable time (<2min total)
## Future Enhancements (Optional)
If real-world usage reveals issues:
1. **Retry policies** with exponential backoff for 429/5xx
2. **Pagination** for inbox/reservations (if >1k messages)
3. **Automatic re-enable** with periodic health checks
4. **Agent instance IDs** to prevent same-name collisions
5. **Soak/stress testing** for production validation
Current test suite provides **strong confidence** for multi-agent workflows without overengineering.

View File

@@ -0,0 +1,510 @@
#!/usr/bin/env python3
"""
Multi-Agent Coordination Test Suite
Fast tests (<30s total) covering critical multi-agent scenarios:
- Fairness: N agents claiming M issues
- Notifications: End-to-end message passing
- Handoff: Release → immediate claim by another agent
- Idempotency: Double operations by same agent
"""
import json
import subprocess
import tempfile
import shutil
import sys
import time
from pathlib import Path
from multiprocessing import Process, Queue
from threading import Thread, Lock
from http.server import HTTPServer, BaseHTTPRequestHandler
import socket
# Add lib directory for beads_mail_adapter
lib_path = Path(__file__).parent.parent.parent / "lib"
sys.path.insert(0, str(lib_path))
from beads_mail_adapter import AgentMailAdapter
class MockAgentMailServer:
"""Lightweight mock server with reservations and notifications."""
def __init__(self, port: int = 0):
self.port = port
self.server = None
self.thread = None
self.reservations = {} # file_path -> agent_name
self.notifications = {} # agent_name -> [messages]
self.lock = Lock()
def start(self) -> int:
"""Start server and return port."""
handler = self._create_handler()
if self.port == 0:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.listen(1)
self.port = s.getsockname()[1]
self.server = HTTPServer(('127.0.0.1', self.port), handler)
self.thread = Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
time.sleep(0.1)
return self.port
def stop(self):
if self.server:
self.server.shutdown()
self.server.server_close()
def _create_handler(self):
parent = self
class Handler(BaseHTTPRequestHandler):
def log_message(self, *args):
pass
def do_GET(self):
if self.path == "/api/health":
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "ok"}')
# Get inbox: /api/notifications/{agent_name}
elif self.path.startswith("/api/notifications/"):
agent_name = self.path.split('/')[-1]
with parent.lock:
messages = parent.notifications.get(agent_name, [])
parent.notifications[agent_name] = [] # Clear after read
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(messages).encode())
elif self.path == "/api/reservations":
with parent.lock:
res_list = [
{"file_path": fp, "agent_name": agent}
for fp, agent in parent.reservations.items()
]
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(res_list).encode())
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length) if content_length > 0 else b'{}'
data = json.loads(body.decode('utf-8'))
# Reserve: /api/reservations
if self.path == "/api/reservations":
file_path = data.get("file_path")
agent_name = data.get("agent_name")
with parent.lock:
if file_path in parent.reservations:
existing = parent.reservations[file_path]
if existing != agent_name:
# Conflict
self.send_response(409)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({
"error": f"Already reserved by {existing}"
}).encode())
return
# else: same agent re-reserving (idempotent)
parent.reservations[file_path] = agent_name
self.send_response(201)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "reserved"}')
# Notify: /api/notifications
elif self.path == "/api/notifications":
from_agent = data.get("from_agent")
event_type = data.get("event_type")
payload = data.get("payload", {})
# Broadcast to all OTHER agents
with parent.lock:
for agent_name in list(parent.notifications.keys()):
if agent_name != from_agent:
parent.notifications[agent_name].append({
"from": from_agent,
"event": event_type,
"data": payload
})
# If target agent specified, ensure they get it
to_agent = payload.get("to_agent")
if to_agent and to_agent not in parent.notifications:
parent.notifications[to_agent] = [{
"from": from_agent,
"event": event_type,
"data": payload
}]
self.send_response(201)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(b'{"status": "sent"}')
else:
self.send_response(404)
self.end_headers()
def do_DELETE(self):
# Release: /api/reservations/{agent}/{issue_id}
parts = self.path.split('/')
if len(parts) >= 5:
agent_name = parts[3]
issue_id = parts[4]
file_path = f".beads/issues/{issue_id}"
with parent.lock:
if file_path in parent.reservations:
if parent.reservations[file_path] == agent_name:
del parent.reservations[file_path]
self.send_response(204)
self.end_headers()
else:
self.send_response(404)
self.end_headers()
return Handler
class TestAgent:
"""Minimal test agent."""
def __init__(self, workspace: str, agent_name: str, mail_url: str):
self.workspace = workspace
self.agent_name = agent_name
self.mail = AgentMailAdapter(url=mail_url, agent_name=agent_name, timeout=2)
def run_bd(self, *args):
cmd = ["bd", "--no-daemon"] + list(args) + ["--json"]
result = subprocess.run(cmd, cwd=self.workspace, capture_output=True, text=True)
if result.returncode != 0:
return {"error": result.stderr}
if result.stdout.strip():
try:
return json.loads(result.stdout)
except:
return {"error": "Invalid JSON"}
return {}
def create_issue(self, title: str) -> str:
result = self.run_bd("create", title, "-p", "1")
return result.get("id")
def claim_issue(self, issue_id: str) -> bool:
if self.mail.enabled and not self.mail.reserve_issue(issue_id):
return False
result = self.run_bd("update", issue_id, "--status", "in_progress")
return "error" not in result
def release_issue(self, issue_id: str):
if self.mail.enabled:
self.mail.release_issue(issue_id)
def agent_claim_worker(agent_name: str, workspace: str, issue_id: str,
mail_url: str, result_queue: Queue):
"""Worker that tries to claim a single issue."""
try:
agent = TestAgent(workspace, agent_name, mail_url)
success = agent.claim_issue(issue_id)
result_queue.put({"agent": agent_name, "issue": issue_id, "success": success})
except Exception as e:
result_queue.put({"agent": agent_name, "issue": issue_id, "success": False, "error": str(e)})
def test_fairness_n_agents_m_issues():
"""Test that N agents competing for M issues results in exactly M claims."""
print("\n" + "="*70)
print("TEST 1: Fairness - 10 agents, 5 issues")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-fairness-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create 5 issues
agent = TestAgent(workspace, "setup", mail_url)
issues = [agent.create_issue(f"Issue {i+1}") for i in range(5)]
# Spawn 10 agents trying to claim all 5 issues
result_queue = Queue()
processes = []
for agent_num in range(10):
for issue_id in issues:
p = Process(target=agent_claim_worker,
args=(f"agent-{agent_num}", workspace, issue_id, mail_url, result_queue))
processes.append(p)
# Start all at once
for p in processes:
p.start()
for p in processes:
p.join(timeout=10)
# Collect results
results = []
while not result_queue.empty():
results.append(result_queue.get())
# Count successful claims per issue
claims_per_issue = {}
for r in results:
if r["success"]:
issue = r["issue"]
claims_per_issue[issue] = claims_per_issue.get(issue, 0) + 1
print(f" • Total attempts: {len(results)}")
print(f" • Successful claims: {sum(claims_per_issue.values())}")
print(f" • Claims per issue: {claims_per_issue}")
# Verify exactly 1 claim per issue
for issue_id in issues:
claims = claims_per_issue.get(issue_id, 0)
assert claims == 1, f"Issue {issue_id} claimed {claims} times (expected 1)"
print("✅ PASS: Each issue claimed exactly once")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_notification_end_to_end():
"""Test notifications from agent1 to agent2."""
print("\n" + "="*70)
print("TEST 2: Notification End-to-End")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-notify-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
# Create two agents
agent1 = TestAgent(workspace, "agent1", mail_url)
agent2 = TestAgent(workspace, "agent2", mail_url)
# Register agent2's inbox
server.notifications["agent2"] = []
# Agent1 sends notification
sent = agent1.mail.notify("task_completed", {
"issue_id": "bd-123",
"status": "done",
"to_agent": "agent2"
})
assert sent, "Should send notification"
# Agent2 checks inbox
messages = agent2.mail.check_inbox()
print(f" • Agent1 sent notification")
print(f" • Agent2 received {len(messages)} message(s)")
assert len(messages) == 1, f"Expected 1 message, got {len(messages)}"
assert messages[0]["from"] == "agent1"
assert messages[0]["event"] == "task_completed"
assert messages[0]["data"]["issue_id"] == "bd-123"
# Second check should be empty (messages consumed)
messages2 = agent2.mail.check_inbox()
assert len(messages2) == 0, "Inbox should be empty after read"
print("✅ PASS: Notification delivered correctly")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_reservation_handoff():
"""Test immediate claim after release (handoff scenario)."""
print("\n" + "="*70)
print("TEST 3: Reservation Handoff")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-handoff-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
agent1 = TestAgent(workspace, "agent1", mail_url)
agent2 = TestAgent(workspace, "agent2", mail_url)
# Agent1 creates and claims issue
issue_id = agent1.create_issue("Handoff test")
claimed1 = agent1.claim_issue(issue_id)
assert claimed1, "Agent1 should claim issue"
# Agent2 tries to claim (should fail - reserved)
claimed2_before = agent2.claim_issue(issue_id)
assert not claimed2_before, "Agent2 should be blocked"
# Agent1 releases
agent1.release_issue(issue_id)
# Agent2 immediately claims (handoff)
claimed2_after = agent2.claim_issue(issue_id)
assert claimed2_after, "Agent2 should claim after release"
# Verify reservation ownership
reservations = agent2.mail.get_reservations()
assert len(reservations) == 1
assert reservations[0]["agent_name"] == "agent2"
print("✅ PASS: Clean handoff from agent1 to agent2")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def test_idempotent_operations():
"""Test double reserve and double release by same agent."""
print("\n" + "="*70)
print("TEST 4: Idempotent Operations")
print("="*70)
workspace = tempfile.mkdtemp(prefix="bd-test-idem-")
server = MockAgentMailServer()
try:
subprocess.run(["bd", "init", "--quiet", "--prefix", "test"],
cwd=workspace, check=True, capture_output=True)
port = server.start()
mail_url = f"http://127.0.0.1:{port}"
agent = TestAgent(workspace, "agent1", mail_url)
issue_id = agent.create_issue("Idempotency test")
# Reserve twice (idempotent)
reserve1 = agent.mail.reserve_issue(issue_id)
reserve2 = agent.mail.reserve_issue(issue_id)
assert reserve1, "First reserve should succeed"
assert reserve2, "Second reserve should be idempotent (same agent)"
# Verify only one reservation
reservations = agent.mail.get_reservations()
assert len(reservations) == 1, f"Should have 1 reservation, got {len(reservations)}"
# Release twice (idempotent)
release1 = agent.mail.release_issue(issue_id)
release2 = agent.mail.release_issue(issue_id)
assert release1, "First release should succeed"
assert release2, "Second release should be idempotent (no error)"
# Verify no reservations
reservations_after = agent.mail.get_reservations()
assert len(reservations_after) == 0, "Should have 0 reservations after release"
print("✅ PASS: Double reserve and release are idempotent")
return True
finally:
server.stop()
shutil.rmtree(workspace, ignore_errors=True)
def main():
"""Run coordination tests."""
print("🧪 Multi-Agent Coordination Test Suite")
print("Fast tests for critical coordination scenarios")
try:
subprocess.run(["bd", "--version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("❌ ERROR: bd command not found")
sys.exit(1)
tests = [
("Fairness (10 agents, 5 issues)", test_fairness_n_agents_m_issues),
("Notification end-to-end", test_notification_end_to_end),
("Reservation handoff", test_reservation_handoff),
("Idempotent operations", test_idempotent_operations),
]
passed = 0
failed = 0
start_time = time.time()
for name, test_func in tests:
try:
if test_func():
passed += 1
except AssertionError as e:
print(f"\n❌ FAIL: {name}")
print(f" {e}")
failed += 1
except Exception as e:
print(f"\n💥 ERROR in {name}: {e}")
import traceback
traceback.print_exc()
failed += 1
elapsed = time.time() - start_time
print("\n" + "="*70)
print("SUMMARY")
print("="*70)
print(f"✅ Passed: {passed}/{len(tests)}")
print(f"❌ Failed: {failed}/{len(tests)}")
print(f"⏱️ Total time: {elapsed:.1f}s")
if failed == 0:
print("\n🎉 All coordination tests passed!")
sys.exit(0)
else:
print(f"\n⚠️ {failed} test(s) failed")
sys.exit(1)
if __name__ == "__main__":
main()