diff --git a/internal/beads/beads_queue.go b/internal/beads/beads_queue.go index 9cc54ec6..37abe08e 100644 --- a/internal/beads/beads_queue.go +++ b/internal/beads/beads_queue.go @@ -22,6 +22,8 @@ type QueueFields struct { ProcessingCount int // Number of items currently being processed CompletedCount int // Number of items completed FailedCount int // Number of items that failed + CreatedBy string // Who created this queue + CreatedAt string // ISO 8601 timestamp of creation } // Queue status constants @@ -78,6 +80,13 @@ func FormatQueueDescription(title string, fields *QueueFields) string { lines = append(lines, fmt.Sprintf("completed_count: %d", fields.CompletedCount)) lines = append(lines, fmt.Sprintf("failed_count: %d", fields.FailedCount)) + if fields.CreatedBy != "" { + lines = append(lines, fmt.Sprintf("created_by: %s", fields.CreatedBy)) + } + if fields.CreatedAt != "" { + lines = append(lines, fmt.Sprintf("created_at: %s", fields.CreatedAt)) + } + return strings.Join(lines, "\n") } @@ -137,6 +146,10 @@ func ParseQueueFields(description string) *QueueFields { if v, err := strconv.Atoi(value); err == nil { fields.FailedCount = v } + case "created_by": + fields.CreatedBy = value + case "created_at": + fields.CreatedAt = value } } diff --git a/internal/cmd/mail_queue.go b/internal/cmd/mail_queue.go index 9d71b19a..93799c96 100644 --- a/internal/cmd/mail_queue.go +++ b/internal/cmd/mail_queue.go @@ -415,3 +415,306 @@ func releaseQueueMessage(beadsDir, messageID, actor string) error { return nil } + +// Queue management commands (beads-native) + +var ( + mailQueueClaimers string + mailQueueJSON bool +) + +var mailQueueCmd = &cobra.Command{ + Use: "queue", + Short: "Manage mail queues", + Long: `Manage beads-native mail queues. + +Queues provide a way to distribute work to eligible workers. +Messages sent to a queue can be claimed by workers matching the claim pattern. + +COMMANDS: + create Create a new queue + show Show queue details + list List all queues + delete Delete a queue + +Examples: + gt mail queue create work --claimers 'gastown/polecats/*' + gt mail queue show work + gt mail queue list + gt mail queue delete work`, + RunE: requireSubcommand, +} + +var mailQueueCreateCmd = &cobra.Command{ + Use: "create ", + Short: "Create a new queue", + Long: `Create a new beads-native mail queue. + +The --claimers flag specifies a pattern for who can claim messages from this queue. +Patterns support wildcards: 'gastown/polecats/*' matches any polecat in gastown rig. + +Examples: + gt mail queue create work --claimers 'gastown/polecats/*' + gt mail queue create dispatch --claimers 'gastown/crew/*' + gt mail queue create urgent --claimers '*'`, + Args: cobra.ExactArgs(1), + RunE: runMailQueueCreate, +} + +var mailQueueShowCmd = &cobra.Command{ + Use: "show ", + Short: "Show queue details", + Long: `Show details about a mail queue. + +Displays the queue's claim pattern, status, and message counts. + +Examples: + gt mail queue show work + gt mail queue show dispatch --json`, + Args: cobra.ExactArgs(1), + RunE: runMailQueueShow, +} + +var mailQueueListCmd = &cobra.Command{ + Use: "list", + Short: "List all queues", + Long: `List all beads-native mail queues. + +Shows queue names, claim patterns, and status. + +Examples: + gt mail queue list + gt mail queue list --json`, + RunE: runMailQueueList, +} + +var mailQueueDeleteCmd = &cobra.Command{ + Use: "delete ", + Short: "Delete a queue", + Long: `Delete a mail queue. + +This permanently removes the queue bead. Messages in the queue are not affected. + +Examples: + gt mail queue delete work`, + Args: cobra.ExactArgs(1), + RunE: runMailQueueDelete, +} + +func init() { + // Queue create flags + mailQueueCreateCmd.Flags().StringVar(&mailQueueClaimers, "claimers", "", "Pattern for who can claim from this queue (required)") + _ = mailQueueCreateCmd.MarkFlagRequired("claimers") + + // Queue show/list flags + mailQueueShowCmd.Flags().BoolVar(&mailQueueJSON, "json", false, "Output as JSON") + mailQueueListCmd.Flags().BoolVar(&mailQueueJSON, "json", false, "Output as JSON") + + // Add queue subcommands + mailQueueCmd.AddCommand(mailQueueCreateCmd) + mailQueueCmd.AddCommand(mailQueueShowCmd) + mailQueueCmd.AddCommand(mailQueueListCmd) + mailQueueCmd.AddCommand(mailQueueDeleteCmd) + + // Add queue command to mail + mailCmd.AddCommand(mailQueueCmd) +} + +// runMailQueueCreate creates a new beads-native queue. +func runMailQueueCreate(cmd *cobra.Command, args []string) error { + queueName := args[0] + + // Find workspace + townRoot, err := workspace.FindFromCwdOrError() + if err != nil { + return fmt.Errorf("not in a Gas Town workspace: %w", err) + } + + // Get caller identity for created_by + caller := detectSender() + + // Create queue bead + b := beads.NewWithBeadsDir(townRoot, beads.ResolveBeadsDir(townRoot)) + + // Generate queue bead ID (town-level: hq-q-) + queueID := beads.QueueBeadID(queueName, true) + + // Check if queue already exists + existing, _, err := b.GetQueueBead(queueID) + if err != nil { + return fmt.Errorf("checking for existing queue: %w", err) + } + if existing != nil { + return fmt.Errorf("queue %q already exists", queueName) + } + + // Create queue fields + fields := &beads.QueueFields{ + Name: queueName, + ClaimPattern: mailQueueClaimers, + Status: beads.QueueStatusActive, + CreatedBy: caller, + CreatedAt: time.Now().Format(time.RFC3339), + } + + title := fmt.Sprintf("Queue: %s", queueName) + _, err = b.CreateQueueBead(queueID, title, fields) + if err != nil { + return fmt.Errorf("creating queue: %w", err) + } + + fmt.Printf("%s Created queue %s\n", style.Bold.Render("✓"), queueName) + fmt.Printf(" ID: %s\n", queueID) + fmt.Printf(" Claimers: %s\n", mailQueueClaimers) + + return nil +} + +// runMailQueueShow shows details about a queue. +func runMailQueueShow(cmd *cobra.Command, args []string) error { + queueName := args[0] + + // Find workspace + townRoot, err := workspace.FindFromCwdOrError() + if err != nil { + return fmt.Errorf("not in a Gas Town workspace: %w", err) + } + + // Get queue bead + b := beads.NewWithBeadsDir(townRoot, beads.ResolveBeadsDir(townRoot)) + + queueID := beads.QueueBeadID(queueName, true) + issue, fields, err := b.GetQueueBead(queueID) + if err != nil { + return fmt.Errorf("getting queue: %w", err) + } + if issue == nil { + return fmt.Errorf("queue %q not found", queueName) + } + + if mailQueueJSON { + output := map[string]interface{}{ + "id": issue.ID, + "name": fields.Name, + "claim_pattern": fields.ClaimPattern, + "status": fields.Status, + "available_count": fields.AvailableCount, + "processing_count": fields.ProcessingCount, + "completed_count": fields.CompletedCount, + "failed_count": fields.FailedCount, + "created_by": fields.CreatedBy, + "created_at": fields.CreatedAt, + } + jsonBytes, err := json.MarshalIndent(output, "", " ") + if err != nil { + return fmt.Errorf("marshaling JSON: %w", err) + } + fmt.Println(string(jsonBytes)) + return nil + } + + // Human-readable output + fmt.Printf("%s Queue: %s\n", style.Bold.Render("📬"), queueName) + fmt.Printf(" ID: %s\n", issue.ID) + fmt.Printf(" Claimers: %s\n", fields.ClaimPattern) + fmt.Printf(" Status: %s\n", fields.Status) + fmt.Printf(" Available: %d\n", fields.AvailableCount) + fmt.Printf(" Processing: %d\n", fields.ProcessingCount) + fmt.Printf(" Completed: %d\n", fields.CompletedCount) + if fields.FailedCount > 0 { + fmt.Printf(" Failed: %d\n", fields.FailedCount) + } + if fields.CreatedBy != "" { + fmt.Printf(" Created by: %s\n", fields.CreatedBy) + } + if fields.CreatedAt != "" { + fmt.Printf(" Created at: %s\n", fields.CreatedAt) + } + + return nil +} + +// runMailQueueList lists all queues. +func runMailQueueList(cmd *cobra.Command, args []string) error { + // Find workspace + townRoot, err := workspace.FindFromCwdOrError() + if err != nil { + return fmt.Errorf("not in a Gas Town workspace: %w", err) + } + + // List queue beads + b := beads.NewWithBeadsDir(townRoot, beads.ResolveBeadsDir(townRoot)) + + queues, err := b.ListQueueBeads() + if err != nil { + return fmt.Errorf("listing queues: %w", err) + } + + if len(queues) == 0 { + fmt.Printf("%s No queues found\n", style.Dim.Render("○")) + return nil + } + + if mailQueueJSON { + var output []map[string]interface{} + for _, issue := range queues { + fields := beads.ParseQueueFields(issue.Description) + output = append(output, map[string]interface{}{ + "id": issue.ID, + "name": fields.Name, + "claim_pattern": fields.ClaimPattern, + "status": fields.Status, + }) + } + jsonBytes, err := json.MarshalIndent(output, "", " ") + if err != nil { + return fmt.Errorf("marshaling JSON: %w", err) + } + fmt.Println(string(jsonBytes)) + return nil + } + + // Human-readable output + fmt.Printf("%s Queues (%d)\n\n", style.Bold.Render("📬"), len(queues)) + for _, issue := range queues { + fields := beads.ParseQueueFields(issue.Description) + fmt.Printf(" %s\n", style.Bold.Render(fields.Name)) + fmt.Printf(" Claimers: %s\n", fields.ClaimPattern) + fmt.Printf(" Status: %s\n", fields.Status) + } + + return nil +} + +// runMailQueueDelete deletes a queue. +func runMailQueueDelete(cmd *cobra.Command, args []string) error { + queueName := args[0] + + // Find workspace + townRoot, err := workspace.FindFromCwdOrError() + if err != nil { + return fmt.Errorf("not in a Gas Town workspace: %w", err) + } + + // Delete queue bead + b := beads.NewWithBeadsDir(townRoot, beads.ResolveBeadsDir(townRoot)) + + queueID := beads.QueueBeadID(queueName, true) + + // Verify queue exists + issue, _, err := b.GetQueueBead(queueID) + if err != nil { + return fmt.Errorf("getting queue: %w", err) + } + if issue == nil { + return fmt.Errorf("queue %q not found", queueName) + } + + if err := b.DeleteQueueBead(queueID); err != nil { + return fmt.Errorf("deleting queue: %w", err) + } + + fmt.Printf("%s Deleted queue %s\n", style.Bold.Render("✓"), queueName) + + return nil +}