#!/usr/bin/env python3 """ Simple AI agent workflow using bd (Beads issue tracker). This demonstrates how an agent can: 1. Find ready work 2. Claim and execute tasks 3. Discover new issues during work 4. Link discoveries back to parent tasks 5. Complete work and move on 6. Coordinate with other agents via Agent Mail (optional) """ import json import subprocess import sys import os from pathlib import Path from typing import Optional # Add lib directory to path for beads_mail_adapter lib_path = Path(__file__).parent.parent.parent / "lib" sys.path.insert(0, str(lib_path)) from beads_mail_adapter import AgentMailAdapter class BeadsAgent: """Simple agent that manages tasks using bd.""" def __init__(self): self.current_task = None self.mail = AgentMailAdapter() if self.mail.enabled: print(f"šŸ“¬ Agent Mail enabled (agent: {self.mail.agent_name})") else: print("šŸ“­ Agent Mail disabled (Beads-only mode)") def run_bd(self, *args) -> dict: """Run bd command and parse JSON output.""" cmd = ["bd"] + list(args) + ["--json"] result = subprocess.run(cmd, capture_output=True, text=True, check=True) if result.stdout.strip(): return json.loads(result.stdout) return {} def find_ready_work(self) -> Optional[dict]: """Find the highest priority ready work. Integration Point 1: Check inbox before finding work. """ # Check inbox for notifications from other agents messages = self.mail.check_inbox() if messages: print(f"šŸ“Ø Received {len(messages)} messages:") for msg in messages: event_type = msg.get("event_type", "unknown") payload = msg.get("payload", {}) from_agent = msg.get("from_agent", "unknown") print(f" • {event_type} from {from_agent}: {payload}") ready = self.run_bd("ready", "--limit", "1") if isinstance(ready, list) and len(ready) > 0: return ready[0] return None def claim_task(self, issue_id: str) -> dict: """Claim a task by setting status to in_progress. Integration Point 2: Reserve issue before claiming. Integration Point 3: Notify other agents of status change. """ # Reserve the issue to prevent conflicts with other agents if not self.mail.reserve_issue(issue_id): print(f"āš ļø Failed to reserve {issue_id} - already claimed by another agent") return {} print(f"šŸ“‹ Claiming task: {issue_id}") result = self.run_bd("update", issue_id, "--status", "in_progress") # Notify other agents of status change self.mail.notify("status_changed", { "issue_id": issue_id, "status": "in_progress", "agent": self.mail.agent_name }) return result def create_issue(self, title: str, description: str = "", priority: int = 2, issue_type: str = "task") -> dict: """Create a new issue.""" print(f"✨ Creating issue: {title}") args = ["create", title, "-p", str(priority), "-t", issue_type] if description: args.extend(["-d", description]) return self.run_bd(*args) def link_discovery(self, discovered_id: str, parent_id: str): """Link a discovered issue back to its parent.""" print(f"šŸ”— Linking {discovered_id} ← discovered-from ← {parent_id}") subprocess.run( ["bd", "dep", "add", discovered_id, parent_id, "--type", "discovered-from"], check=True ) def complete_task(self, issue_id: str, reason: str = "Completed"): """Mark task as complete. Integration Point 4: Release reservation and notify completion. """ print(f"āœ… Completing task: {issue_id} - {reason}") result = self.run_bd("close", issue_id, "--reason", reason) # Notify other agents of completion self.mail.notify("issue_completed", { "issue_id": issue_id, "reason": reason, "agent": self.mail.agent_name }) # Release the reservation self.mail.release_issue(issue_id) return result def simulate_work(self, issue: dict) -> bool: """Simulate doing work on an issue. In a real agent, this would call an LLM, execute code, etc. Returns True if work discovered new issues. """ issue_id = issue["id"] title = issue["title"] print(f"\nšŸ¤– Working on: {title} ({issue_id})") print(f" Priority: {issue['priority']}, Type: {issue['issue_type']}") # Simulate discovering a bug while working if "implement" in title.lower() or "add" in title.lower(): print("\nšŸ’” Discovered: Missing test coverage for this feature") new_issue = self.create_issue( f"Add tests for {title}", description=f"While implementing {issue_id}, noticed missing tests", priority=1, issue_type="task" ) self.link_discovery(new_issue["id"], issue_id) return True return False def run_once(self) -> bool: """Execute one work cycle. Returns True if work was found.""" # Find ready work issue = self.find_ready_work() if not issue: print("šŸ“­ No ready work found.") return False # Claim the task self.claim_task(issue["id"]) # Do the work (simulated) discovered_new_work = self.simulate_work(issue) # Complete the task self.complete_task(issue["id"], "Implemented successfully") if discovered_new_work: print("\nšŸ”„ New work discovered and linked. Running another cycle...") return True def run(self, max_iterations: int = 10): """Run the agent for multiple iterations.""" print("šŸš€ Beads Agent starting...\n") for i in range(max_iterations): print(f"\n{'='*60}") print(f"Iteration {i+1}/{max_iterations}") print(f"{'='*60}") if not self.run_once(): break print("\n✨ Agent finished!") def main(): """Main entry point.""" try: agent = BeadsAgent() agent.run() except subprocess.CalledProcessError as e: print(f"Error running bd: {e}", file=sys.stderr) print(f"Make sure bd is installed: go install github.com/steveyegge/beads/cmd/bd@latest") sys.exit(1) except KeyboardInterrupt: print("\n\nšŸ‘‹ Agent interrupted by user") sys.exit(0) if __name__ == "__main__": main()