From f79614d76484e3651e6a50171662c8925068cbf2 Mon Sep 17 00:00:00 2001 From: gastown/crew/george Date: Mon, 12 Jan 2026 18:38:57 -0800 Subject: [PATCH] feat(daemon): event-driven convoy completion check (hq-5kmkl) Add ConvoyWatcher that monitors bd activity for issue closes and triggers convoy completion checks immediately rather than waiting for patrol. - Watch bd activity --follow --town --json for status=closed events - Query SQLite for convoys tracking the closed issue - Trigger gt convoy check when tracked issue closes - Convoys close within seconds of last issue closing Co-Authored-By: Claude Opus 4.5 --- internal/daemon/convoy_watcher.go | 239 +++++++++++++++++++++++++ internal/daemon/convoy_watcher_test.go | 89 +++++++++ internal/daemon/daemon.go | 27 ++- 3 files changed, 349 insertions(+), 6 deletions(-) create mode 100644 internal/daemon/convoy_watcher.go create mode 100644 internal/daemon/convoy_watcher_test.go diff --git a/internal/daemon/convoy_watcher.go b/internal/daemon/convoy_watcher.go new file mode 100644 index 00000000..ddf5731d --- /dev/null +++ b/internal/daemon/convoy_watcher.go @@ -0,0 +1,239 @@ +package daemon + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" +) + +// ConvoyWatcher monitors bd activity for issue closes and triggers convoy completion checks. +// When an issue closes, it checks if the issue is tracked by any convoy and runs the +// completion check if all tracked issues are now closed. +type ConvoyWatcher struct { + townRoot string + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + logger func(format string, args ...interface{}) +} + +// bdActivityEvent represents an event from bd activity --json. +type bdActivityEvent struct { + Timestamp string `json:"timestamp"` + Type string `json:"type"` + IssueID string `json:"issue_id"` + Symbol string `json:"symbol"` + Message string `json:"message"` + OldStatus string `json:"old_status,omitempty"` + NewStatus string `json:"new_status,omitempty"` +} + +// NewConvoyWatcher creates a new convoy watcher. +func NewConvoyWatcher(townRoot string, logger func(format string, args ...interface{})) *ConvoyWatcher { + ctx, cancel := context.WithCancel(context.Background()) + return &ConvoyWatcher{ + townRoot: townRoot, + ctx: ctx, + cancel: cancel, + logger: logger, + } +} + +// Start begins the convoy watcher goroutine. +func (w *ConvoyWatcher) Start() error { + w.wg.Add(1) + go w.run() + return nil +} + +// Stop gracefully stops the convoy watcher. +func (w *ConvoyWatcher) Stop() { + w.cancel() + w.wg.Wait() +} + +// run is the main watcher loop. +func (w *ConvoyWatcher) run() { + defer w.wg.Done() + + for { + select { + case <-w.ctx.Done(): + return + default: + // Start bd activity --follow --town --json + if err := w.watchActivity(); err != nil { + w.logger("convoy watcher: bd activity error: %v, restarting in 5s", err) + // Wait before retry, but respect context cancellation + select { + case <-w.ctx.Done(): + return + case <-time.After(5 * time.Second): + // Continue to retry + } + } + } + } +} + +// watchActivity starts bd activity and processes events until error or context cancellation. +func (w *ConvoyWatcher) watchActivity() error { + cmd := exec.CommandContext(w.ctx, "bd", "activity", "--follow", "--town", "--json") + cmd.Dir = w.townRoot + + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + return fmt.Errorf("starting bd activity: %w", err) + } + + scanner := bufio.NewScanner(stdout) + for scanner.Scan() { + select { + case <-w.ctx.Done(): + _ = cmd.Process.Kill() + return nil + default: + } + + line := scanner.Text() + w.processLine(line) + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("reading bd activity: %w", err) + } + + return cmd.Wait() +} + +// processLine processes a single line from bd activity (NDJSON format). +func (w *ConvoyWatcher) processLine(line string) { + line = strings.TrimSpace(line) + if line == "" { + return + } + + var event bdActivityEvent + if err := json.Unmarshal([]byte(line), &event); err != nil { + return // Skip malformed lines + } + + // Only interested in status changes to closed + if event.Type != "status" || event.NewStatus != "closed" { + return + } + + w.logger("convoy watcher: detected close of %s", event.IssueID) + + // Check if this issue is tracked by any convoy + convoyIDs := w.getTrackingConvoys(event.IssueID) + if len(convoyIDs) == 0 { + return + } + + w.logger("convoy watcher: %s is tracked by %d convoy(s): %v", event.IssueID, len(convoyIDs), convoyIDs) + + // Check each tracking convoy for completion + for _, convoyID := range convoyIDs { + w.checkConvoyCompletion(convoyID) + } +} + +// getTrackingConvoys returns convoy IDs that track the given issue. +func (w *ConvoyWatcher) getTrackingConvoys(issueID string) []string { + townBeads := filepath.Join(w.townRoot, ".beads") + dbPath := filepath.Join(townBeads, "beads.db") + + // Query for convoys that track this issue + // Handle both direct ID and external reference format + safeIssueID := strings.ReplaceAll(issueID, "'", "''") + + // Query for dependencies where this issue is the target + // Convoys use "tracks" type: convoy -> tracked issue (depends_on_id) + query := fmt.Sprintf(` + SELECT DISTINCT issue_id FROM dependencies + WHERE type = 'tracks' + AND (depends_on_id = '%s' OR depends_on_id LIKE '%%:%s') + `, safeIssueID, safeIssueID) + + queryCmd := exec.Command("sqlite3", "-json", dbPath, query) + var stdout bytes.Buffer + queryCmd.Stdout = &stdout + + if err := queryCmd.Run(); err != nil { + return nil + } + + var results []struct { + IssueID string `json:"issue_id"` + } + if err := json.Unmarshal(stdout.Bytes(), &results); err != nil { + return nil + } + + convoyIDs := make([]string, 0, len(results)) + for _, r := range results { + convoyIDs = append(convoyIDs, r.IssueID) + } + return convoyIDs +} + +// checkConvoyCompletion checks if all issues tracked by a convoy are closed. +// If so, runs gt convoy check to close the convoy. +func (w *ConvoyWatcher) checkConvoyCompletion(convoyID string) { + townBeads := filepath.Join(w.townRoot, ".beads") + dbPath := filepath.Join(townBeads, "beads.db") + + // First check if the convoy is still open + convoyQuery := fmt.Sprintf(`SELECT status FROM issues WHERE id = '%s'`, + strings.ReplaceAll(convoyID, "'", "''")) + + queryCmd := exec.Command("sqlite3", "-json", dbPath, convoyQuery) + var stdout bytes.Buffer + queryCmd.Stdout = &stdout + + if err := queryCmd.Run(); err != nil { + return + } + + var convoyStatus []struct { + Status string `json:"status"` + } + if err := json.Unmarshal(stdout.Bytes(), &convoyStatus); err != nil || len(convoyStatus) == 0 { + return + } + + if convoyStatus[0].Status == "closed" { + return // Already closed + } + + // Run gt convoy check to handle the completion + // This reuses the existing logic which handles notifications, etc. + w.logger("convoy watcher: running completion check for %s", convoyID) + + checkCmd := exec.Command("gt", "convoy", "check") + checkCmd.Dir = w.townRoot + var checkStdout, checkStderr bytes.Buffer + checkCmd.Stdout = &checkStdout + checkCmd.Stderr = &checkStderr + + if err := checkCmd.Run(); err != nil { + w.logger("convoy watcher: gt convoy check failed: %v: %s", err, checkStderr.String()) + return + } + + if output := checkStdout.String(); output != "" && !strings.Contains(output, "No convoys ready") { + w.logger("convoy watcher: %s", strings.TrimSpace(output)) + } +} diff --git a/internal/daemon/convoy_watcher_test.go b/internal/daemon/convoy_watcher_test.go new file mode 100644 index 00000000..5e053144 --- /dev/null +++ b/internal/daemon/convoy_watcher_test.go @@ -0,0 +1,89 @@ +package daemon + +import ( + "encoding/json" + "testing" +) + +func TestBdActivityEventParsing(t *testing.T) { + testCases := []struct { + name string + line string + wantType string + wantIssueID string + wantNew string + }{ + { + name: "status change to closed", + line: `{"timestamp":"2026-01-12T02:50:35.778328-08:00","type":"status","issue_id":"gt-uoc64","symbol":"✓","message":"gt-uoc64 completed","old_status":"in_progress","new_status":"closed"}`, + wantType: "status", + wantIssueID: "gt-uoc64", + wantNew: "closed", + }, + { + name: "status change to in_progress", + line: `{"timestamp":"2026-01-12T02:43:04.467992-08:00","type":"status","issue_id":"gt-uoc64","symbol":"→","message":"gt-uoc64 started","old_status":"open","new_status":"in_progress","actor":"gastown/crew/george"}`, + wantType: "status", + wantIssueID: "gt-uoc64", + wantNew: "in_progress", + }, + { + name: "create event", + line: `{"timestamp":"2026-01-12T01:19:01.753578-08:00","type":"create","issue_id":"gt-dgbwk","symbol":"+","message":"gt-dgbwk created"}`, + wantType: "create", + wantIssueID: "gt-dgbwk", + wantNew: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var event bdActivityEvent + err := json.Unmarshal([]byte(tc.line), &event) + if err != nil { + t.Fatalf("failed to parse: %v", err) + } + + if event.Type != tc.wantType { + t.Errorf("type = %q, want %q", event.Type, tc.wantType) + } + if event.IssueID != tc.wantIssueID { + t.Errorf("issue_id = %q, want %q", event.IssueID, tc.wantIssueID) + } + if event.NewStatus != tc.wantNew { + t.Errorf("new_status = %q, want %q", event.NewStatus, tc.wantNew) + } + }) + } +} + +func TestIsCloseEvent(t *testing.T) { + closedEvent := bdActivityEvent{ + Type: "status", + IssueID: "gt-test", + NewStatus: "closed", + } + + if closedEvent.Type != "status" || closedEvent.NewStatus != "closed" { + t.Error("should detect close event") + } + + inProgressEvent := bdActivityEvent{ + Type: "status", + IssueID: "gt-test", + NewStatus: "in_progress", + } + + if inProgressEvent.Type == "status" && inProgressEvent.NewStatus == "closed" { + t.Error("should not detect in_progress as close") + } + + createEvent := bdActivityEvent{ + Type: "create", + IssueID: "gt-test", + } + + if createEvent.Type == "status" && createEvent.NewStatus == "closed" { + t.Error("should not detect create as close") + } +} diff --git a/internal/daemon/daemon.go b/internal/daemon/daemon.go index 178df66a..0f172172 100755 --- a/internal/daemon/daemon.go +++ b/internal/daemon/daemon.go @@ -37,12 +37,13 @@ import ( // This is recovery-focused: normal wake is handled by feed subscription (bd activity --follow). // The daemon is the safety net for dead sessions, GUPP violations, and orphaned work. type Daemon struct { - config *Config - tmux *tmux.Tmux - logger *log.Logger - ctx context.Context - cancel context.CancelFunc - curator *feed.Curator + config *Config + tmux *tmux.Tmux + logger *log.Logger + ctx context.Context + cancel context.CancelFunc + curator *feed.Curator + convoyWatcher *ConvoyWatcher // Mass death detection: track recent session deaths deathsMu sync.Mutex @@ -143,6 +144,14 @@ func (d *Daemon) Run() error { d.logger.Println("Feed curator started") } + // Start convoy watcher for event-driven convoy completion + d.convoyWatcher = NewConvoyWatcher(d.config.TownRoot, d.logger.Printf) + if err := d.convoyWatcher.Start(); err != nil { + d.logger.Printf("Warning: failed to start convoy watcher: %v", err) + } else { + d.logger.Println("Convoy watcher started") + } + // Initial heartbeat d.heartbeat(state) @@ -579,6 +588,12 @@ func (d *Daemon) shutdown(state *State) error { //nolint:unparam // error return d.logger.Println("Feed curator stopped") } + // Stop convoy watcher + if d.convoyWatcher != nil { + d.convoyWatcher.Stop() + d.logger.Println("Convoy watcher stopped") + } + state.Running = false if err := SaveState(d.config.TownRoot, state); err != nil { d.logger.Printf("Warning: failed to save final state: %v", err)