feat: add Jira export script (jsonl2jira.py)

Add a Python script to push bd issues to Jira.

Features:
- Create new Jira issues from bd issues without external_ref
- Update existing Jira issues matched by external_ref
- Handle Jira workflow transitions for status changes
- Reverse field mappings (bd -> Jira) via config
- Dry-run mode for previewing changes
- Auto-update external_ref after creation (--update-refs)

Also updates README to document bidirectional sync workflow.

Closes bd-93d

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-11-30 15:19:51 -08:00
parent 9aada0fd89
commit cbf6545b95
2 changed files with 924 additions and 3 deletions

View File

@@ -1,16 +1,32 @@
# Jira Issues to bd Importer
# Jira Integration for bd
Import issues from Jira Cloud or Jira Server/Data Center into `bd`.
Two-way synchronization between Jira and bd (beads).
## Scripts
| Script | Purpose |
|--------|---------|
| `jira2jsonl.py` | **Import** - Fetch Jira issues into bd |
| `jsonl2jira.py` | **Export** - Push bd issues to Jira |
## Overview
This tool converts Jira Issues to bd's JSONL format, supporting:
These tools enable bidirectional sync between Jira and bd:
**Import (Jira → bd):**
1. **Jira REST API** - Fetch issues directly from any Jira instance
2. **JSON Export** - Parse exported Jira issues JSON
3. **bd config integration** - Read credentials and mappings from `bd config`
**Export (bd → Jira):**
1. **Create issues** - Push new bd issues to Jira
2. **Update issues** - Sync changes to existing Jira issues
3. **Status transitions** - Handle Jira workflow transitions automatically
## Features
### Import (jira2jsonl.py)
- Fetch from Jira Cloud or Server/Data Center
- JQL query support for flexible filtering
- Configurable field mappings (status, priority, type)
@@ -19,6 +35,15 @@ This tool converts Jira Issues to bd's JSONL format, supporting:
- Set `external_ref` for re-sync capability
- Hash-based or sequential ID generation
### Export (jsonl2jira.py)
- Create new Jira issues from bd issues
- Update existing Jira issues (matched by `external_ref`)
- Handle Jira workflow transitions for status changes
- Reverse field mappings (bd → Jira)
- Dry-run mode for previewing changes
- Auto-update `external_ref` after creation
## Installation
No dependencies required! Uses Python 3 standard library.
@@ -351,6 +376,164 @@ Jira Cloud has rate limits. For large imports:
This script fetches 100 issues per request, so a 1000-issue project requires ~10 API calls.
---
# Export: jsonl2jira.py
Push bd issues to Jira.
## Export Quick Start
```bash
# Export all issues (create new, update existing)
bd export | python jsonl2jira.py --from-config
# Create only (don't update existing Jira issues)
bd export | python jsonl2jira.py --from-config --create-only
# Dry run (preview what would happen)
bd export | python jsonl2jira.py --from-config --dry-run
# Auto-update bd with new external_refs
bd export | python jsonl2jira.py --from-config --update-refs
```
## Export Modes
### Create Only
Only create new Jira issues for bd issues that don't have an `external_ref`:
```bash
bd export | python jsonl2jira.py --from-config --create-only
```
### Create and Update
Create new issues AND update existing ones (matched by `external_ref`):
```bash
bd export | python jsonl2jira.py --from-config
```
### Dry Run
Preview what would happen without making any changes:
```bash
bd export | python jsonl2jira.py --from-config --dry-run
```
## Workflow Transitions
Jira often requires workflow transitions to change issue status (you can't just set `status=Done`). The export script automatically:
1. Fetches available transitions for each issue
2. Finds a transition that leads to the target status
3. Executes the transition
If no valid transition is found, the status change is skipped with a warning.
## Reverse Field Mappings
For export, you need mappings from bd → Jira (reverse of import):
```bash
# Status: bd status -> Jira status name
bd config set jira.reverse_status_map.open "To Do"
bd config set jira.reverse_status_map.in_progress "In Progress"
bd config set jira.reverse_status_map.blocked "Blocked"
bd config set jira.reverse_status_map.closed "Done"
# Type: bd type -> Jira issue type name
bd config set jira.reverse_type_map.bug "Bug"
bd config set jira.reverse_type_map.feature "Story"
bd config set jira.reverse_type_map.task "Task"
bd config set jira.reverse_type_map.epic "Epic"
bd config set jira.reverse_type_map.chore "Task"
# Priority: bd priority (0-4) -> Jira priority name
bd config set jira.reverse_priority_map.0 "Highest"
bd config set jira.reverse_priority_map.1 "High"
bd config set jira.reverse_priority_map.2 "Medium"
bd config set jira.reverse_priority_map.3 "Low"
bd config set jira.reverse_priority_map.4 "Lowest"
```
If not configured, sensible defaults are used.
## Updating external_ref
After creating a Jira issue, you'll want to link it back to the bd issue:
```bash
# Option 1: Auto-update with --update-refs flag
bd export | python jsonl2jira.py --from-config --update-refs
# Option 2: Manual update from script output
bd export | python jsonl2jira.py --from-config | while read line; do
bd_id=$(echo "$line" | jq -r '.bd_id')
ext_ref=$(echo "$line" | jq -r '.external_ref')
bd update "$bd_id" --external-ref="$ext_ref"
done
```
## Export Examples
### Example 1: Initial Export to Jira
```bash
# First, export all open issues
bd list --status open --json | python jsonl2jira.py --from-config --update-refs
# Now those issues have external_ref set
bd list --status open
```
### Example 2: Sync Changes Back to Jira
```bash
# Export issues modified today
bd list --json | python jsonl2jira.py --from-config
```
### Example 3: Preview Before Export
```bash
# See what would happen
bd export | python jsonl2jira.py --from-config --dry-run
# If it looks good, run for real
bd export | python jsonl2jira.py --from-config --update-refs
```
## Export Limitations
- **Assignee**: Not set (requires Jira account ID lookup)
- **Dependencies**: Not synced to Jira issue links
- **Comments**: Not exported
- **Custom fields**: design, acceptance_criteria, notes not exported
- **Attachments**: Not exported
## Bidirectional Sync Workflow
For ongoing synchronization between Jira and bd:
```bash
# 1. Pull changes from Jira
python jira2jsonl.py --from-config --jql "project=PROJ AND updated >= -1d" | bd import
# 2. Do local work in bd
bd update bd-xxx --status in_progress
# ... work ...
bd close bd-xxx
# 3. Push changes to Jira
bd export | python jsonl2jira.py --from-config
# 4. Repeat daily/weekly
```
## See Also
- [bd README](../../README.md) - Main documentation

View File

@@ -0,0 +1,738 @@
#!/usr/bin/env python3
"""
Export bd issues to Jira.
Creates new Jira issues from bd issues without external_ref, and optionally
updates existing Jira issues matched by external_ref.
Usage:
# Export all issues (create new, update existing)
bd export | python jsonl2jira.py --from-config
# Create only (don't update existing Jira issues)
bd export | python jsonl2jira.py --from-config --create-only
# Dry run (preview what would happen)
bd export | python jsonl2jira.py --from-config --dry-run
# From JSONL file
python jsonl2jira.py --from-config --file issues.jsonl
"""
import base64
import json
import os
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
def get_bd_config(key: str) -> Optional[str]:
"""Get a configuration value from bd config."""
try:
result = subprocess.run(
["bd", "config", "get", "--json", key],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
data = json.loads(result.stdout)
return data.get("value")
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
pass
return None
def get_all_bd_config() -> Dict[str, str]:
"""Get all configuration values from bd config."""
try:
result = subprocess.run(
["bd", "config", "list", "--json"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return json.loads(result.stdout)
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
pass
return {}
def get_reverse_status_mapping() -> Dict[str, str]:
"""
Get reverse status mapping (bd status -> Jira status).
Uses jira.reverse_status_map.* if configured, otherwise inverts jira.status_map.*.
Falls back to sensible defaults.
"""
config = get_all_bd_config()
# Check for explicit reverse mappings first
reverse_map = {}
for key, value in config.items():
if key.startswith("jira.reverse_status_map."):
bd_status = key[len("jira.reverse_status_map."):]
reverse_map[bd_status] = value
if reverse_map:
return reverse_map
# Invert the forward mapping
for key, value in config.items():
if key.startswith("jira.status_map."):
jira_status = key[len("jira.status_map."):]
# Value is bd status, key suffix is jira status
if value not in reverse_map:
reverse_map[value] = jira_status.replace("_", " ").title()
# Add defaults for any missing bd statuses
defaults = {
"open": "To Do",
"in_progress": "In Progress",
"blocked": "Blocked",
"closed": "Done",
}
for bd_status, jira_status in defaults.items():
if bd_status not in reverse_map:
reverse_map[bd_status] = jira_status
return reverse_map
def get_reverse_type_mapping() -> Dict[str, str]:
"""
Get reverse type mapping (bd type -> Jira issue type).
Uses jira.reverse_type_map.* if configured, otherwise inverts jira.type_map.*.
Falls back to sensible defaults.
"""
config = get_all_bd_config()
# Check for explicit reverse mappings first
reverse_map = {}
for key, value in config.items():
if key.startswith("jira.reverse_type_map."):
bd_type = key[len("jira.reverse_type_map."):]
reverse_map[bd_type] = value
if reverse_map:
return reverse_map
# Invert the forward mapping
for key, value in config.items():
if key.startswith("jira.type_map."):
jira_type = key[len("jira.type_map."):]
if value not in reverse_map:
reverse_map[value] = jira_type.replace("_", " ").title()
# Add defaults for any missing bd types
defaults = {
"bug": "Bug",
"feature": "Story",
"task": "Task",
"epic": "Epic",
"chore": "Task",
}
for bd_type, jira_type in defaults.items():
if bd_type not in reverse_map:
reverse_map[bd_type] = jira_type
return reverse_map
def get_reverse_priority_mapping() -> Dict[int, str]:
"""
Get reverse priority mapping (bd priority -> Jira priority name).
Uses jira.reverse_priority_map.* if configured.
Falls back to sensible defaults.
"""
config = get_all_bd_config()
# Check for explicit reverse mappings first
reverse_map = {}
for key, value in config.items():
if key.startswith("jira.reverse_priority_map."):
try:
bd_priority = int(key[len("jira.reverse_priority_map."):])
reverse_map[bd_priority] = value
except ValueError:
pass
if reverse_map:
return reverse_map
# Default mapping
return {
0: "Highest",
1: "High",
2: "Medium",
3: "Low",
4: "Lowest",
}
class BeadsToJira:
"""Export bd issues to Jira."""
def __init__(
self,
jira_url: str,
project: str,
username: Optional[str] = None,
api_token: Optional[str] = None,
create_only: bool = False,
dry_run: bool = False
):
self.jira_url = jira_url.rstrip("/")
self.project = project
self.username = username
self.api_token = api_token
self.create_only = create_only
self.dry_run = dry_run
# Determine auth method
self.is_cloud = "atlassian.net" in jira_url
if self.is_cloud:
if not username:
raise ValueError(
"Jira Cloud requires username (email). "
"Set JIRA_USERNAME env var or pass --username"
)
auth_string = f"{username}:{api_token}"
self.auth_header = f"Basic {base64.b64encode(auth_string.encode()).decode()}"
else:
if username:
auth_string = f"{username}:{api_token}"
self.auth_header = f"Basic {base64.b64encode(auth_string.encode()).decode()}"
else:
self.auth_header = f"Bearer {api_token}"
# Load mappings
self.status_map = get_reverse_status_mapping()
self.type_map = get_reverse_type_mapping()
self.priority_map = get_reverse_priority_mapping()
# Cache for Jira metadata
self._transitions_cache: Dict[str, List[Dict]] = {}
self._issue_types_cache: Optional[List[Dict]] = None
self._priorities_cache: Optional[List[Dict]] = None
# Results tracking
self.created: List[Tuple[str, str]] = [] # (bd_id, jira_key)
self.updated: List[Tuple[str, str]] = [] # (bd_id, jira_key)
self.skipped: List[Tuple[str, str]] = [] # (bd_id, reason)
self.errors: List[Tuple[str, str]] = [] # (bd_id, error)
def _make_request(
self,
method: str,
endpoint: str,
data: Optional[Dict] = None
) -> Optional[Dict]:
"""Make an authenticated request to Jira API."""
url = f"{self.jira_url}/rest/api/2/{endpoint}"
headers = {
"Authorization": self.auth_header,
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "bd-jira-export/1.0",
}
body = json.dumps(data).encode() if data else None
try:
req = Request(url, data=body, headers=headers, method=method)
with urlopen(req, timeout=30) as response:
response_body = response.read().decode()
if response_body:
return json.loads(response_body)
return {}
except HTTPError as e:
error_body = e.read().decode(errors="replace")
raise RuntimeError(f"Jira API error {e.code}: {error_body}")
except URLError as e:
raise RuntimeError(f"Network error: {e.reason}")
def get_issue_types(self) -> List[Dict]:
"""Get available issue types for the project."""
if self._issue_types_cache is not None:
return self._issue_types_cache
try:
result = self._make_request("GET", f"project/{self.project}")
self._issue_types_cache = result.get("issueTypes", [])
except Exception:
# Fallback: try createmeta endpoint
try:
result = self._make_request(
"GET",
f"issue/createmeta?projectKeys={self.project}&expand=projects.issuetypes"
)
projects = result.get("projects", [])
if projects:
self._issue_types_cache = projects[0].get("issuetypes", [])
else:
self._issue_types_cache = []
except Exception:
self._issue_types_cache = []
return self._issue_types_cache
def get_priorities(self) -> List[Dict]:
"""Get available priorities."""
if self._priorities_cache is not None:
return self._priorities_cache
try:
self._priorities_cache = self._make_request("GET", "priority") or []
except Exception:
self._priorities_cache = []
return self._priorities_cache
def get_transitions(self, issue_key: str) -> List[Dict]:
"""Get available transitions for an issue."""
if issue_key in self._transitions_cache:
return self._transitions_cache[issue_key]
try:
result = self._make_request("GET", f"issue/{issue_key}/transitions")
transitions = result.get("transitions", [])
self._transitions_cache[issue_key] = transitions
return transitions
except Exception:
return []
def find_issue_type_id(self, bd_type: str) -> Optional[str]:
"""Find Jira issue type ID for a bd type."""
jira_type_name = self.type_map.get(bd_type, "Task")
issue_types = self.get_issue_types()
# Try exact match first
for it in issue_types:
if it.get("name", "").lower() == jira_type_name.lower():
return it.get("id")
# Try partial match
for it in issue_types:
if jira_type_name.lower() in it.get("name", "").lower():
return it.get("id")
# Fallback to first non-subtask type
for it in issue_types:
if not it.get("subtask", False):
return it.get("id")
return None
def find_priority_id(self, bd_priority: int) -> Optional[str]:
"""Find Jira priority ID for a bd priority."""
jira_priority_name = self.priority_map.get(bd_priority, "Medium")
priorities = self.get_priorities()
# Try exact match first
for p in priorities:
if p.get("name", "").lower() == jira_priority_name.lower():
return p.get("id")
# Fallback to Medium or first available
for p in priorities:
if p.get("name", "").lower() == "medium":
return p.get("id")
if priorities:
return priorities[0].get("id")
return None
def find_transition(self, issue_key: str, target_status: str) -> Optional[str]:
"""Find transition ID to move issue to target status."""
jira_status = self.status_map.get(target_status, "To Do")
transitions = self.get_transitions(issue_key)
# Try exact match on target status
for t in transitions:
to_status = t.get("to", {}).get("name", "")
if to_status.lower() == jira_status.lower():
return t.get("id")
# Try partial match
for t in transitions:
to_status = t.get("to", {}).get("name", "")
if jira_status.lower() in to_status.lower():
return t.get("id")
return None
def extract_jira_key_from_external_ref(self, external_ref: str) -> Optional[str]:
"""Extract Jira issue key from external_ref URL."""
# Match patterns like:
# https://company.atlassian.net/browse/PROJ-123
# https://jira.company.com/browse/PROJ-123
match = re.search(r'/browse/([A-Z]+-\d+)', external_ref)
if match:
return match.group(1)
return None
def create_issue(self, bd_issue: Dict) -> Optional[str]:
"""Create a new Jira issue. Returns the Jira key."""
issue_type_id = self.find_issue_type_id(bd_issue.get("issue_type", "task"))
priority_id = self.find_priority_id(bd_issue.get("priority", 2))
if not issue_type_id:
raise RuntimeError(f"Could not find issue type for '{bd_issue.get('issue_type')}'")
fields = {
"project": {"key": self.project},
"summary": bd_issue.get("title", "Untitled"),
"description": bd_issue.get("description", ""),
"issuetype": {"id": issue_type_id},
}
if priority_id:
fields["priority"] = {"id": priority_id}
# Add labels if present
labels = bd_issue.get("labels", [])
if labels:
fields["labels"] = labels
# Add assignee if present (requires account ID for Cloud)
# This is complex - skip for now as it requires user lookup
# assignee = bd_issue.get("assignee")
if self.dry_run:
print(f"[DRY RUN] Would create: {bd_issue.get('title')}", file=sys.stderr)
return "DRY-RUN-KEY"
result = self._make_request("POST", "issue", {"fields": fields})
return result.get("key")
def update_issue(self, jira_key: str, bd_issue: Dict) -> bool:
"""Update an existing Jira issue. Returns True if updated."""
# First, get current issue to compare
try:
current = self._make_request("GET", f"issue/{jira_key}")
except RuntimeError:
return False
current_fields = current.get("fields", {})
updates = {}
# Check summary
if bd_issue.get("title") and bd_issue["title"] != current_fields.get("summary"):
updates["summary"] = bd_issue["title"]
# Check description
if bd_issue.get("description") != current_fields.get("description"):
updates["description"] = bd_issue.get("description", "")
# Check priority
current_priority = current_fields.get("priority", {}).get("name", "").lower()
target_priority = self.priority_map.get(bd_issue.get("priority", 2), "Medium").lower()
if current_priority != target_priority:
priority_id = self.find_priority_id(bd_issue.get("priority", 2))
if priority_id:
updates["priority"] = {"id": priority_id}
# Check labels
current_labels = set(current_fields.get("labels", []))
new_labels = set(bd_issue.get("labels", []))
if current_labels != new_labels:
updates["labels"] = list(new_labels)
if self.dry_run:
if updates:
print(f"[DRY RUN] Would update {jira_key}: {list(updates.keys())}", file=sys.stderr)
return bool(updates)
# Apply field updates
if updates:
self._make_request("PUT", f"issue/{jira_key}", {"fields": updates})
# Handle status transition separately
current_status = current_fields.get("status", {}).get("name", "").lower()
target_status = bd_issue.get("status", "open")
target_jira_status = self.status_map.get(target_status, "To Do").lower()
if current_status != target_jira_status:
transition_id = self.find_transition(jira_key, target_status)
if transition_id:
if self.dry_run:
print(f"[DRY RUN] Would transition {jira_key} to {target_jira_status}", file=sys.stderr)
else:
try:
self._make_request(
"POST",
f"issue/{jira_key}/transitions",
{"transition": {"id": transition_id}}
)
except RuntimeError as e:
print(f"Warning: Could not transition {jira_key}: {e}", file=sys.stderr)
return bool(updates) or current_status != target_jira_status
def process_issue(self, bd_issue: Dict) -> None:
"""Process a single bd issue."""
bd_id = bd_issue.get("id", "unknown")
external_ref = bd_issue.get("external_ref", "")
try:
# Check if this issue already has a Jira reference
jira_key = None
if external_ref:
jira_key = self.extract_jira_key_from_external_ref(external_ref)
if jira_key:
# Issue exists in Jira
if self.create_only:
self.skipped.append((bd_id, f"Already in Jira as {jira_key} (--create-only)"))
return
# Update existing issue
if self.update_issue(jira_key, bd_issue):
self.updated.append((bd_id, jira_key))
else:
self.skipped.append((bd_id, f"No changes for {jira_key}"))
else:
# Create new issue
new_key = self.create_issue(bd_issue)
if new_key:
self.created.append((bd_id, new_key))
# Output the mapping for updating external_ref
if not self.dry_run:
new_ref = f"{self.jira_url}/browse/{new_key}"
print(
json.dumps({"bd_id": bd_id, "jira_key": new_key, "external_ref": new_ref}),
file=sys.stdout
)
except RuntimeError as e:
self.errors.append((bd_id, str(e)))
def process_issues(self, issues: List[Dict]) -> None:
"""Process all issues."""
total = len(issues)
for i, issue in enumerate(issues, 1):
print(f"Processing {i}/{total}: {issue.get('id', 'unknown')}...", file=sys.stderr)
self.process_issue(issue)
def print_summary(self) -> None:
"""Print summary of operations."""
print("\n--- Summary ---", file=sys.stderr)
print(f"Created: {len(self.created)}", file=sys.stderr)
for bd_id, jira_key in self.created:
print(f" {bd_id} -> {jira_key}", file=sys.stderr)
print(f"Updated: {len(self.updated)}", file=sys.stderr)
for bd_id, jira_key in self.updated:
print(f" {bd_id} -> {jira_key}", file=sys.stderr)
print(f"Skipped: {len(self.skipped)}", file=sys.stderr)
for bd_id, reason in self.skipped:
print(f" {bd_id}: {reason}", file=sys.stderr)
if self.errors:
print(f"Errors: {len(self.errors)}", file=sys.stderr)
for bd_id, error in self.errors:
print(f" {bd_id}: {error}", file=sys.stderr)
def update_bd_external_refs(mappings: List[Dict]) -> None:
"""Update bd issues with external_ref from created Jira issues."""
for mapping in mappings:
bd_id = mapping.get("bd_id")
external_ref = mapping.get("external_ref")
if bd_id and external_ref:
try:
subprocess.run(
["bd", "update", bd_id, f"--external-ref={external_ref}"],
capture_output=True,
timeout=10
)
except (subprocess.TimeoutExpired, FileNotFoundError):
print(f"Warning: Could not update external_ref for {bd_id}", file=sys.stderr)
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Export bd issues to Jira",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Export all issues (create new, update existing)
bd export | python jsonl2jira.py --from-config
# Create only (don't update existing Jira issues)
bd export | python jsonl2jira.py --from-config --create-only
# Dry run (preview what would happen)
bd export | python jsonl2jira.py --from-config --dry-run
# From JSONL file
python jsonl2jira.py --from-config --file issues.jsonl
# Update bd with new external_refs
bd export | python jsonl2jira.py --from-config | while read line; do
bd_id=$(echo "$line" | jq -r '.bd_id')
ext_ref=$(echo "$line" | jq -r '.external_ref')
bd update "$bd_id" --external-ref="$ext_ref"
done
Configuration:
Set up bd config for easier usage:
bd config set jira.url "https://company.atlassian.net"
bd config set jira.project "PROJ"
bd config set jira.api_token "YOUR_TOKEN"
bd config set jira.username "your_email@company.com" # For Jira Cloud
Reverse field mappings (bd -> Jira):
bd config set jira.reverse_status_map.open "To Do"
bd config set jira.reverse_status_map.in_progress "In Progress"
bd config set jira.reverse_status_map.closed "Done"
bd config set jira.reverse_type_map.feature "Story"
bd config set jira.reverse_priority_map.0 "Highest"
"""
)
parser.add_argument(
"--url",
help="Jira instance URL (e.g., https://company.atlassian.net)"
)
parser.add_argument(
"--project",
help="Jira project key (e.g., PROJ)"
)
parser.add_argument(
"--file",
type=Path,
help="JSONL file containing bd issues (default: read from stdin)"
)
parser.add_argument(
"--from-config",
action="store_true",
help="Read Jira settings from bd config"
)
parser.add_argument(
"--username",
help="Jira username/email (or set JIRA_USERNAME env var)"
)
parser.add_argument(
"--api-token",
help="Jira API token (or set JIRA_API_TOKEN env var)"
)
parser.add_argument(
"--create-only",
action="store_true",
help="Only create new issues, don't update existing ones"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview what would happen without making changes"
)
parser.add_argument(
"--update-refs",
action="store_true",
help="Automatically update bd issues with external_ref after creation"
)
args = parser.parse_args()
# Resolve configuration
jira_url = args.url
project = args.project
username = args.username
api_token = args.api_token
if args.from_config:
if not jira_url:
jira_url = get_bd_config("jira.url")
if not project:
project = get_bd_config("jira.project")
if not username:
username = get_bd_config("jira.username")
if not api_token:
api_token = get_bd_config("jira.api_token")
# Environment variable fallbacks
if not api_token:
api_token = os.getenv("JIRA_API_TOKEN")
if not username:
username = os.getenv("JIRA_USERNAME")
# Validate
if not jira_url:
parser.error("--url is required (or use --from-config with jira.url configured)")
if not project:
parser.error("--project is required (or use --from-config with jira.project configured)")
if not api_token:
parser.error("Jira API token required. Set JIRA_API_TOKEN env var or pass --api-token")
# Load issues
issues = []
if args.file:
with open(args.file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
issues.append(json.loads(line))
else:
# Read from stdin
for line in sys.stdin:
line = line.strip()
if line:
issues.append(json.loads(line))
if not issues:
print("No issues to export", file=sys.stderr)
sys.exit(0)
print(f"Processing {len(issues)} issues...", file=sys.stderr)
# Create exporter and process
exporter = BeadsToJira(
jira_url=jira_url,
project=project,
username=username,
api_token=api_token,
create_only=args.create_only,
dry_run=args.dry_run
)
exporter.process_issues(issues)
exporter.print_summary()
# Optionally update bd external_refs
if args.update_refs and exporter.created and not args.dry_run:
print("\nUpdating bd issues with external_ref...", file=sys.stderr)
mappings = [
{"bd_id": bd_id, "external_ref": f"{jira_url}/browse/{jira_key}"}
for bd_id, jira_key in exporter.created
]
update_bd_external_refs(mappings)
# Exit with error if there were failures
if exporter.errors:
sys.exit(1)
if __name__ == "__main__":
main()