- Created test_agent_race.py with 3 test scenarios - Tests collision prevention with Agent Mail reservations - Validates that only one agent claims an issue when reservations active - Demonstrates collision problem when Agent Mail disabled - Includes stress test with 10 agents - Non-interactive mode support for CI/automation Amp-Thread-ID: https://ampcode.com/threads/T-2fb10899-490f-4d41-b003-8bc4d467cc54 Co-authored-by: Amp <amp@ampcode.com>
210 lines
6.7 KiB
Python
Executable File
210 lines
6.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Simple AI agent workflow using bd (Beads issue tracker).
|
|
|
|
This demonstrates how an agent can:
|
|
1. Find ready work
|
|
2. Claim and execute tasks
|
|
3. Discover new issues during work
|
|
4. Link discoveries back to parent tasks
|
|
5. Complete work and move on
|
|
6. Coordinate with other agents via Agent Mail (optional)
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Add lib directory to path for beads_mail_adapter
|
|
lib_path = Path(__file__).parent.parent.parent / "lib"
|
|
sys.path.insert(0, str(lib_path))
|
|
|
|
from beads_mail_adapter import AgentMailAdapter
|
|
|
|
|
|
class BeadsAgent:
|
|
"""Simple agent that manages tasks using bd."""
|
|
|
|
def __init__(self):
|
|
self.current_task = None
|
|
self.mail = AgentMailAdapter()
|
|
|
|
if self.mail.enabled:
|
|
print(f"📬 Agent Mail enabled (agent: {self.mail.agent_name})")
|
|
else:
|
|
print("📭 Agent Mail disabled (Beads-only mode)")
|
|
|
|
def run_bd(self, *args) -> dict:
|
|
"""Run bd command and parse JSON output."""
|
|
cmd = ["bd"] + list(args) + ["--json"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
|
|
if result.stdout.strip():
|
|
return json.loads(result.stdout)
|
|
return {}
|
|
|
|
def find_ready_work(self) -> Optional[dict]:
|
|
"""Find the highest priority ready work.
|
|
|
|
Integration Point 1: Check inbox before finding work.
|
|
"""
|
|
# Check inbox for notifications from other agents
|
|
messages = self.mail.check_inbox()
|
|
if messages:
|
|
print(f"📨 Received {len(messages)} messages:")
|
|
for msg in messages:
|
|
event_type = msg.get("event_type", "unknown")
|
|
payload = msg.get("payload", {})
|
|
from_agent = msg.get("from_agent", "unknown")
|
|
print(f" • {event_type} from {from_agent}: {payload}")
|
|
|
|
ready = self.run_bd("ready", "--limit", "1")
|
|
|
|
if isinstance(ready, list) and len(ready) > 0:
|
|
return ready[0]
|
|
return None
|
|
|
|
def claim_task(self, issue_id: str) -> dict:
|
|
"""Claim a task by setting status to in_progress.
|
|
|
|
Integration Point 2: Reserve issue before claiming.
|
|
Integration Point 3: Notify other agents of status change.
|
|
"""
|
|
# Reserve the issue to prevent conflicts with other agents
|
|
if not self.mail.reserve_issue(issue_id):
|
|
print(f"⚠️ Failed to reserve {issue_id} - already claimed by another agent")
|
|
return {}
|
|
|
|
print(f"📋 Claiming task: {issue_id}")
|
|
result = self.run_bd("update", issue_id, "--status", "in_progress")
|
|
|
|
# Notify other agents of status change
|
|
self.mail.notify("status_changed", {
|
|
"issue_id": issue_id,
|
|
"status": "in_progress",
|
|
"agent": self.mail.agent_name
|
|
})
|
|
|
|
return result
|
|
|
|
def create_issue(self, title: str, description: str = "",
|
|
priority: int = 2, issue_type: str = "task") -> dict:
|
|
"""Create a new issue."""
|
|
print(f"✨ Creating issue: {title}")
|
|
args = ["create", title, "-p", str(priority), "-t", issue_type]
|
|
if description:
|
|
args.extend(["-d", description])
|
|
return self.run_bd(*args)
|
|
|
|
def link_discovery(self, discovered_id: str, parent_id: str):
|
|
"""Link a discovered issue back to its parent."""
|
|
print(f"🔗 Linking {discovered_id} ← discovered-from ← {parent_id}")
|
|
subprocess.run(
|
|
["bd", "dep", "add", discovered_id, parent_id, "--type", "discovered-from"],
|
|
check=True
|
|
)
|
|
|
|
def complete_task(self, issue_id: str, reason: str = "Completed"):
|
|
"""Mark task as complete.
|
|
|
|
Integration Point 4: Release reservation and notify completion.
|
|
"""
|
|
print(f"✅ Completing task: {issue_id} - {reason}")
|
|
result = self.run_bd("close", issue_id, "--reason", reason)
|
|
|
|
# Notify other agents of completion
|
|
self.mail.notify("issue_completed", {
|
|
"issue_id": issue_id,
|
|
"reason": reason,
|
|
"agent": self.mail.agent_name
|
|
})
|
|
|
|
# Release the reservation
|
|
self.mail.release_issue(issue_id)
|
|
|
|
return result
|
|
|
|
def simulate_work(self, issue: dict) -> bool:
|
|
"""Simulate doing work on an issue.
|
|
|
|
In a real agent, this would call an LLM, execute code, etc.
|
|
Returns True if work discovered new issues.
|
|
"""
|
|
issue_id = issue["id"]
|
|
title = issue["title"]
|
|
|
|
print(f"\n🤖 Working on: {title} ({issue_id})")
|
|
print(f" Priority: {issue['priority']}, Type: {issue['issue_type']}")
|
|
|
|
# Simulate discovering a bug while working
|
|
if "implement" in title.lower() or "add" in title.lower():
|
|
print("\n💡 Discovered: Missing test coverage for this feature")
|
|
new_issue = self.create_issue(
|
|
f"Add tests for {title}",
|
|
description=f"While implementing {issue_id}, noticed missing tests",
|
|
priority=1,
|
|
issue_type="task"
|
|
)
|
|
self.link_discovery(new_issue["id"], issue_id)
|
|
return True
|
|
|
|
return False
|
|
|
|
def run_once(self) -> bool:
|
|
"""Execute one work cycle. Returns True if work was found."""
|
|
# Find ready work
|
|
issue = self.find_ready_work()
|
|
|
|
if not issue:
|
|
print("📭 No ready work found.")
|
|
return False
|
|
|
|
# Claim the task
|
|
self.claim_task(issue["id"])
|
|
|
|
# Do the work (simulated)
|
|
discovered_new_work = self.simulate_work(issue)
|
|
|
|
# Complete the task
|
|
self.complete_task(issue["id"], "Implemented successfully")
|
|
|
|
if discovered_new_work:
|
|
print("\n🔄 New work discovered and linked. Running another cycle...")
|
|
|
|
return True
|
|
|
|
def run(self, max_iterations: int = 10):
|
|
"""Run the agent for multiple iterations."""
|
|
print("🚀 Beads Agent starting...\n")
|
|
|
|
for i in range(max_iterations):
|
|
print(f"\n{'='*60}")
|
|
print(f"Iteration {i+1}/{max_iterations}")
|
|
print(f"{'='*60}")
|
|
|
|
if not self.run_once():
|
|
break
|
|
|
|
print("\n✨ Agent finished!")
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
try:
|
|
agent = BeadsAgent()
|
|
agent.run()
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running bd: {e}", file=sys.stderr)
|
|
print(f"Make sure bd is installed: go install github.com/steveyegge/beads/cmd/bd@latest")
|
|
sys.exit(1)
|
|
except KeyboardInterrupt:
|
|
print("\n\n👋 Agent interrupted by user")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|