From d0a1e165e55bab28d949164a8e25107b83a132a5 Mon Sep 17 00:00:00 2001 From: dementus Date: Tue, 20 Jan 2026 19:41:33 -0800 Subject: [PATCH] feat(convoy): add redundant observers to Witness and Refinery Per PRIMING.md principle "Redundant Monitoring Is Resilience", add convoy completion checks to Witness and Refinery for redundant observation: - New internal/convoy/observer.go with shared CheckConvoysForIssue function - Witness: checks convoys after successful polecat nuke in HandleMerged - Refinery: checks convoys after closing source issue in both success handlers Multiple observers closing the same convoy is idempotent - each checks if convoy is already closed before running `gt convoy check`. Co-Authored-By: Claude Opus 4.5 --- internal/convoy/observer.go | 136 ++++++++++++++++++++++++++++++++++ internal/refinery/engineer.go | 13 ++++ internal/witness/handlers.go | 17 +++++ 3 files changed, 166 insertions(+) create mode 100644 internal/convoy/observer.go diff --git a/internal/convoy/observer.go b/internal/convoy/observer.go new file mode 100644 index 00000000..588dba87 --- /dev/null +++ b/internal/convoy/observer.go @@ -0,0 +1,136 @@ +// Package convoy provides shared convoy operations for redundant observers. +package convoy + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "path/filepath" + "strings" +) + +// CheckConvoysForIssue finds any convoys tracking the given issue and triggers +// convoy completion checks. This enables redundant convoy observation from +// multiple agents (Witness, Refinery, Daemon). +// +// The check is idempotent - running it multiple times for the same issue is safe. +// The underlying `gt convoy check` handles already-closed convoys gracefully. +// +// Parameters: +// - townRoot: path to the town root directory +// - issueID: the issue ID that was just closed +// - observer: identifier for logging (e.g., "witness", "refinery") +// - logger: optional logger function (can be nil) +// +// Returns the convoy IDs that were checked (may be empty if issue isn't tracked). +func CheckConvoysForIssue(townRoot, issueID, observer string, logger func(format string, args ...interface{})) []string { + if logger == nil { + logger = func(format string, args ...interface{}) {} // no-op + } + + // Find convoys tracking this issue + convoyIDs := getTrackingConvoys(townRoot, issueID) + if len(convoyIDs) == 0 { + return nil + } + + logger("%s: issue %s is tracked by %d convoy(s): %v", observer, issueID, len(convoyIDs), convoyIDs) + + // Run convoy check for each tracking convoy + // Note: gt convoy check is idempotent and handles already-closed convoys + for _, convoyID := range convoyIDs { + if isConvoyClosed(townRoot, convoyID) { + logger("%s: convoy %s already closed, skipping", observer, convoyID) + continue + } + + logger("%s: running convoy check for %s", observer, convoyID) + if err := runConvoyCheck(townRoot); err != nil { + logger("%s: convoy check failed: %v", observer, err) + } + } + + return convoyIDs +} + +// getTrackingConvoys returns convoy IDs that track the given issue. +// Uses direct SQLite query for efficiency (same approach as daemon/convoy_watcher). +func getTrackingConvoys(townRoot, issueID string) []string { + townBeads := filepath.Join(townRoot, ".beads") + dbPath := filepath.Join(townBeads, "beads.db") + + // Query for convoys that track this issue + // Handle both direct ID and external reference format + safeIssueID := strings.ReplaceAll(issueID, "'", "''") + + // Query for dependencies where this issue is the target + // Convoys use "tracks" type: convoy -> tracked issue (depends_on_id) + query := fmt.Sprintf(` + SELECT DISTINCT issue_id FROM dependencies + WHERE type = 'tracks' + AND (depends_on_id = '%s' OR depends_on_id LIKE '%%:%s') + `, safeIssueID, safeIssueID) + + queryCmd := exec.Command("sqlite3", "-json", dbPath, query) + var stdout bytes.Buffer + queryCmd.Stdout = &stdout + + if err := queryCmd.Run(); err != nil { + return nil + } + + var results []struct { + IssueID string `json:"issue_id"` + } + if err := json.Unmarshal(stdout.Bytes(), &results); err != nil { + return nil + } + + convoyIDs := make([]string, 0, len(results)) + for _, r := range results { + convoyIDs = append(convoyIDs, r.IssueID) + } + return convoyIDs +} + +// isConvoyClosed checks if a convoy is already closed. +func isConvoyClosed(townRoot, convoyID string) bool { + townBeads := filepath.Join(townRoot, ".beads") + dbPath := filepath.Join(townBeads, "beads.db") + + safeConvoyID := strings.ReplaceAll(convoyID, "'", "''") + query := fmt.Sprintf(`SELECT status FROM issues WHERE id = '%s'`, safeConvoyID) + + queryCmd := exec.Command("sqlite3", "-json", dbPath, query) + var stdout bytes.Buffer + queryCmd.Stdout = &stdout + + if err := queryCmd.Run(); err != nil { + return false + } + + var results []struct { + Status string `json:"status"` + } + if err := json.Unmarshal(stdout.Bytes(), &results); err != nil || len(results) == 0 { + return false + } + + return results[0].Status == "closed" +} + +// runConvoyCheck runs `gt convoy check` to close any completed convoys. +// This is idempotent and handles already-closed convoys gracefully. +func runConvoyCheck(townRoot string) error { + cmd := exec.Command("gt", "convoy", "check") + cmd.Dir = townRoot + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("%v: %s", err, stderr.String()) + } + + return nil +} diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index b0f7ffde..526aca87 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -14,6 +14,7 @@ import ( "time" "github.com/steveyegge/gastown/internal/beads" + "github.com/steveyegge/gastown/internal/convoy" "github.com/steveyegge/gastown/internal/git" "github.com/steveyegge/gastown/internal/mail" "github.com/steveyegge/gastown/internal/protocol" @@ -449,6 +450,12 @@ func (e *Engineer) handleSuccess(mr *beads.Issue, result ProcessResult) { _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mrFields.SourceIssue, err) } else { _, _ = fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mrFields.SourceIssue) + + // Redundant convoy observer: check if merged issue is tracked by a convoy + logger := func(format string, args ...interface{}) { + _, _ = fmt.Fprintf(e.output, "[Engineer] "+format+"\n", args...) + } + convoy.CheckConvoysForIssue(e.rig.Path, mrFields.SourceIssue, "refinery", logger) } } @@ -557,6 +564,12 @@ func (e *Engineer) HandleMRInfoSuccess(mr *MRInfo, result ProcessResult) { _, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mr.SourceIssue, err) } else { _, _ = fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mr.SourceIssue) + + // Redundant convoy observer: check if merged issue is tracked by a convoy + logger := func(format string, args ...interface{}) { + _, _ = fmt.Fprintf(e.output, "[Engineer] "+format+"\n", args...) + } + convoy.CheckConvoysForIssue(e.rig.Path, mr.SourceIssue, "refinery", logger) } } diff --git a/internal/witness/handlers.go b/internal/witness/handlers.go index a901a6a1..30a2d365 100644 --- a/internal/witness/handlers.go +++ b/internal/witness/handlers.go @@ -9,6 +9,7 @@ import ( "time" "github.com/steveyegge/gastown/internal/beads" + "github.com/steveyegge/gastown/internal/convoy" "github.com/steveyegge/gastown/internal/git" "github.com/steveyegge/gastown/internal/mail" "github.com/steveyegge/gastown/internal/rig" @@ -264,6 +265,14 @@ func HandleMerged(workDir, rigName string, msg *mail.Message) *HandlerResult { result.Handled = true result.WispCreated = wispID result.Action = fmt.Sprintf("auto-nuked %s (cleanup_status=clean, wisp=%s)", payload.PolecatName, wispID) + + // Redundant convoy observer: check if completed issue is tracked by a convoy + if payload.IssueID != "" { + townRoot, _ := workspace.Find(workDir) + if townRoot != "" { + convoy.CheckConvoysForIssue(townRoot, payload.IssueID, "witness", nil) + } + } } case "has_uncommitted": @@ -299,6 +308,14 @@ func HandleMerged(workDir, rigName string, msg *mail.Message) *HandlerResult { result.Handled = true result.WispCreated = wispID result.Action = fmt.Sprintf("auto-nuked %s (commit on main, cleanup_status=%s, wisp=%s)", payload.PolecatName, cleanupStatus, wispID) + + // Redundant convoy observer: check if completed issue is tracked by a convoy + if payload.IssueID != "" { + townRoot, _ := workspace.Find(workDir) + if townRoot != "" { + convoy.CheckConvoysForIssue(townRoot, payload.IssueID, "witness", nil) + } + } } }