feat(beads): add queue bead type

Add queue bead type for tracking work queues in Gas Town. This includes:
- QueueFields struct with status, concurrency, processing order, and counts
- Parse/Format functions for queue field serialization
- CRUD methods: CreateQueueBead, GetQueueBead, UpdateQueueFields, etc.
- Queue registered in BeadsCustomTypes for bd CLI support

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gastown/crew/dennis
2026-01-14 21:10:50 -08:00
committed by Steve Yegge
parent 0bf68de517
commit a244c3d498
2 changed files with 270 additions and 2 deletions

View File

@@ -0,0 +1,268 @@
// Package beads provides queue bead management.
package beads
import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
)
// QueueFields holds structured fields for queue beads.
// These are stored as "key: value" lines in the description.
type QueueFields struct {
Name string // Queue name (human-readable identifier)
Status string // active, paused, closed
MaxConcurrency int // Maximum number of concurrent workers (0 = unlimited)
ProcessingOrder string // fifo, priority (default: fifo)
AvailableCount int // Number of items ready to process
ProcessingCount int // Number of items currently being processed
CompletedCount int // Number of items completed
FailedCount int // Number of items that failed
}
// Queue status constants
const (
QueueStatusActive = "active"
QueueStatusPaused = "paused"
QueueStatusClosed = "closed"
)
// Queue processing order constants
const (
QueueOrderFIFO = "fifo"
QueueOrderPriority = "priority"
)
// FormatQueueDescription creates a description string from queue fields.
func FormatQueueDescription(title string, fields *QueueFields) string {
if fields == nil {
return title
}
var lines []string
lines = append(lines, title)
lines = append(lines, "")
if fields.Name != "" {
lines = append(lines, fmt.Sprintf("name: %s", fields.Name))
} else {
lines = append(lines, "name: null")
}
if fields.Status != "" {
lines = append(lines, fmt.Sprintf("status: %s", fields.Status))
} else {
lines = append(lines, "status: active")
}
lines = append(lines, fmt.Sprintf("max_concurrency: %d", fields.MaxConcurrency))
if fields.ProcessingOrder != "" {
lines = append(lines, fmt.Sprintf("processing_order: %s", fields.ProcessingOrder))
} else {
lines = append(lines, "processing_order: fifo")
}
lines = append(lines, fmt.Sprintf("available_count: %d", fields.AvailableCount))
lines = append(lines, fmt.Sprintf("processing_count: %d", fields.ProcessingCount))
lines = append(lines, fmt.Sprintf("completed_count: %d", fields.CompletedCount))
lines = append(lines, fmt.Sprintf("failed_count: %d", fields.FailedCount))
return strings.Join(lines, "\n")
}
// ParseQueueFields extracts queue fields from an issue's description.
func ParseQueueFields(description string) *QueueFields {
fields := &QueueFields{
Status: QueueStatusActive,
ProcessingOrder: QueueOrderFIFO,
}
for _, line := range strings.Split(description, "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
colonIdx := strings.Index(line, ":")
if colonIdx == -1 {
continue
}
key := strings.TrimSpace(line[:colonIdx])
value := strings.TrimSpace(line[colonIdx+1:])
if value == "null" || value == "" {
value = ""
}
switch strings.ToLower(key) {
case "name":
fields.Name = value
case "status":
fields.Status = value
case "max_concurrency":
if v, err := strconv.Atoi(value); err == nil {
fields.MaxConcurrency = v
}
case "processing_order":
fields.ProcessingOrder = value
case "available_count":
if v, err := strconv.Atoi(value); err == nil {
fields.AvailableCount = v
}
case "processing_count":
if v, err := strconv.Atoi(value); err == nil {
fields.ProcessingCount = v
}
case "completed_count":
if v, err := strconv.Atoi(value); err == nil {
fields.CompletedCount = v
}
case "failed_count":
if v, err := strconv.Atoi(value); err == nil {
fields.FailedCount = v
}
}
}
return fields
}
// QueueBeadID returns the queue bead ID for a given queue name.
// Format: hq-q-<name> for town-level queues, gt-q-<name> for rig-level queues.
func QueueBeadID(name string, isTownLevel bool) string {
if isTownLevel {
return "hq-q-" + name
}
return "gt-q-" + name
}
// CreateQueueBead creates a queue bead for tracking work queues.
// The ID format is: <prefix>-q-<name> (e.g., gt-q-merge, hq-q-dispatch)
// The created_by field is populated from BD_ACTOR env var for provenance tracking.
func (b *Beads) CreateQueueBead(id, title string, fields *QueueFields) (*Issue, error) {
description := FormatQueueDescription(title, fields)
args := []string{"create", "--json",
"--id=" + id,
"--title=" + title,
"--description=" + description,
"--type=queue",
"--labels=gt:queue",
}
// Default actor from BD_ACTOR env var for provenance tracking
if actor := os.Getenv("BD_ACTOR"); actor != "" {
args = append(args, "--actor="+actor)
}
out, err := b.run(args...)
if err != nil {
return nil, err
}
var issue Issue
if err := json.Unmarshal(out, &issue); err != nil {
return nil, fmt.Errorf("parsing bd create output: %w", err)
}
return &issue, nil
}
// GetQueueBead retrieves a queue bead by ID.
// Returns nil if not found.
func (b *Beads) GetQueueBead(id string) (*Issue, *QueueFields, error) {
issue, err := b.Show(id)
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil, nil, nil
}
return nil, nil, err
}
if !HasLabel(issue, "gt:queue") {
return nil, nil, fmt.Errorf("issue %s is not a queue bead (missing gt:queue label)", id)
}
fields := ParseQueueFields(issue.Description)
return issue, fields, nil
}
// UpdateQueueFields updates the fields of a queue bead.
func (b *Beads) UpdateQueueFields(id string, fields *QueueFields) error {
issue, err := b.Show(id)
if err != nil {
return err
}
description := FormatQueueDescription(issue.Title, fields)
return b.Update(id, UpdateOptions{Description: &description})
}
// UpdateQueueCounts updates the count fields of a queue bead.
// This is a convenience method for incrementing/decrementing counts.
func (b *Beads) UpdateQueueCounts(id string, available, processing, completed, failed int) error {
issue, currentFields, err := b.GetQueueBead(id)
if err != nil {
return err
}
if issue == nil {
return ErrNotFound
}
currentFields.AvailableCount = available
currentFields.ProcessingCount = processing
currentFields.CompletedCount = completed
currentFields.FailedCount = failed
return b.UpdateQueueFields(id, currentFields)
}
// UpdateQueueStatus updates the status of a queue bead.
func (b *Beads) UpdateQueueStatus(id, status string) error {
// Validate status
if status != QueueStatusActive && status != QueueStatusPaused && status != QueueStatusClosed {
return fmt.Errorf("invalid queue status %q: must be active, paused, or closed", status)
}
issue, currentFields, err := b.GetQueueBead(id)
if err != nil {
return err
}
if issue == nil {
return ErrNotFound
}
currentFields.Status = status
return b.UpdateQueueFields(id, currentFields)
}
// ListQueueBeads returns all queue beads.
func (b *Beads) ListQueueBeads() (map[string]*Issue, error) {
out, err := b.run("list", "--label=gt:queue", "--json")
if err != nil {
return nil, err
}
var issues []*Issue
if err := json.Unmarshal(out, &issues); err != nil {
return nil, fmt.Errorf("parsing bd list output: %w", err)
}
result := make(map[string]*Issue, len(issues))
for _, issue := range issues {
result[issue.ID] = issue
}
return result, nil
}
// DeleteQueueBead permanently deletes a queue bead.
// Uses --hard --force for immediate permanent deletion (no tombstone).
func (b *Beads) DeleteQueueBead(id string) error {
_, err := b.run("delete", id, "--hard", "--force")
return err
}

View File

@@ -83,12 +83,12 @@ const (
// BeadsCustomTypes is the comma-separated list of custom issue types that
// Gas Town registers with beads. These types were extracted from beads core
// in v0.46.0 and now require explicit configuration.
BeadsCustomTypes = "agent,role,rig,convoy,slot"
BeadsCustomTypes = "agent,role,rig,convoy,slot,queue"
)
// BeadsCustomTypesList returns the custom types as a slice.
func BeadsCustomTypesList() []string {
return []string{"agent", "role", "rig", "convoy", "slot"}
return []string{"agent", "role", "rig", "convoy", "slot", "queue"}
}
// Git branch names.