feat(convoy): add redundant observers to Witness and Refinery
Per PRIMING.md principle "Redundant Monitoring Is Resilience", add convoy completion checks to Witness and Refinery for redundant observation: - New internal/convoy/observer.go with shared CheckConvoysForIssue function - Witness: checks convoys after successful polecat nuke in HandleMerged - Refinery: checks convoys after closing source issue in both success handlers Multiple observers closing the same convoy is idempotent - each checks if convoy is already closed before running `gt convoy check`. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
beads/crew/emma
parent
2b56ee2545
commit
d0a1e165e5
136
internal/convoy/observer.go
Normal file
136
internal/convoy/observer.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
// Package convoy provides shared convoy operations for redundant observers.
|
||||||
|
package convoy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CheckConvoysForIssue finds any convoys tracking the given issue and triggers
|
||||||
|
// convoy completion checks. This enables redundant convoy observation from
|
||||||
|
// multiple agents (Witness, Refinery, Daemon).
|
||||||
|
//
|
||||||
|
// The check is idempotent - running it multiple times for the same issue is safe.
|
||||||
|
// The underlying `gt convoy check` handles already-closed convoys gracefully.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - townRoot: path to the town root directory
|
||||||
|
// - issueID: the issue ID that was just closed
|
||||||
|
// - observer: identifier for logging (e.g., "witness", "refinery")
|
||||||
|
// - logger: optional logger function (can be nil)
|
||||||
|
//
|
||||||
|
// Returns the convoy IDs that were checked (may be empty if issue isn't tracked).
|
||||||
|
func CheckConvoysForIssue(townRoot, issueID, observer string, logger func(format string, args ...interface{})) []string {
|
||||||
|
if logger == nil {
|
||||||
|
logger = func(format string, args ...interface{}) {} // no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find convoys tracking this issue
|
||||||
|
convoyIDs := getTrackingConvoys(townRoot, issueID)
|
||||||
|
if len(convoyIDs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logger("%s: issue %s is tracked by %d convoy(s): %v", observer, issueID, len(convoyIDs), convoyIDs)
|
||||||
|
|
||||||
|
// Run convoy check for each tracking convoy
|
||||||
|
// Note: gt convoy check is idempotent and handles already-closed convoys
|
||||||
|
for _, convoyID := range convoyIDs {
|
||||||
|
if isConvoyClosed(townRoot, convoyID) {
|
||||||
|
logger("%s: convoy %s already closed, skipping", observer, convoyID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
logger("%s: running convoy check for %s", observer, convoyID)
|
||||||
|
if err := runConvoyCheck(townRoot); err != nil {
|
||||||
|
logger("%s: convoy check failed: %v", observer, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return convoyIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
// getTrackingConvoys returns convoy IDs that track the given issue.
|
||||||
|
// Uses direct SQLite query for efficiency (same approach as daemon/convoy_watcher).
|
||||||
|
func getTrackingConvoys(townRoot, issueID string) []string {
|
||||||
|
townBeads := filepath.Join(townRoot, ".beads")
|
||||||
|
dbPath := filepath.Join(townBeads, "beads.db")
|
||||||
|
|
||||||
|
// Query for convoys that track this issue
|
||||||
|
// Handle both direct ID and external reference format
|
||||||
|
safeIssueID := strings.ReplaceAll(issueID, "'", "''")
|
||||||
|
|
||||||
|
// Query for dependencies where this issue is the target
|
||||||
|
// Convoys use "tracks" type: convoy -> tracked issue (depends_on_id)
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
SELECT DISTINCT issue_id FROM dependencies
|
||||||
|
WHERE type = 'tracks'
|
||||||
|
AND (depends_on_id = '%s' OR depends_on_id LIKE '%%:%s')
|
||||||
|
`, safeIssueID, safeIssueID)
|
||||||
|
|
||||||
|
queryCmd := exec.Command("sqlite3", "-json", dbPath, query)
|
||||||
|
var stdout bytes.Buffer
|
||||||
|
queryCmd.Stdout = &stdout
|
||||||
|
|
||||||
|
if err := queryCmd.Run(); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var results []struct {
|
||||||
|
IssueID string `json:"issue_id"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(stdout.Bytes(), &results); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
convoyIDs := make([]string, 0, len(results))
|
||||||
|
for _, r := range results {
|
||||||
|
convoyIDs = append(convoyIDs, r.IssueID)
|
||||||
|
}
|
||||||
|
return convoyIDs
|
||||||
|
}
|
||||||
|
|
||||||
|
// isConvoyClosed checks if a convoy is already closed.
|
||||||
|
func isConvoyClosed(townRoot, convoyID string) bool {
|
||||||
|
townBeads := filepath.Join(townRoot, ".beads")
|
||||||
|
dbPath := filepath.Join(townBeads, "beads.db")
|
||||||
|
|
||||||
|
safeConvoyID := strings.ReplaceAll(convoyID, "'", "''")
|
||||||
|
query := fmt.Sprintf(`SELECT status FROM issues WHERE id = '%s'`, safeConvoyID)
|
||||||
|
|
||||||
|
queryCmd := exec.Command("sqlite3", "-json", dbPath, query)
|
||||||
|
var stdout bytes.Buffer
|
||||||
|
queryCmd.Stdout = &stdout
|
||||||
|
|
||||||
|
if err := queryCmd.Run(); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
var results []struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(stdout.Bytes(), &results); err != nil || len(results) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return results[0].Status == "closed"
|
||||||
|
}
|
||||||
|
|
||||||
|
// runConvoyCheck runs `gt convoy check` to close any completed convoys.
|
||||||
|
// This is idempotent and handles already-closed convoys gracefully.
|
||||||
|
func runConvoyCheck(townRoot string) error {
|
||||||
|
cmd := exec.Command("gt", "convoy", "check")
|
||||||
|
cmd.Dir = townRoot
|
||||||
|
var stderr bytes.Buffer
|
||||||
|
cmd.Stderr = &stderr
|
||||||
|
|
||||||
|
if err := cmd.Run(); err != nil {
|
||||||
|
return fmt.Errorf("%v: %s", err, stderr.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/steveyegge/gastown/internal/beads"
|
"github.com/steveyegge/gastown/internal/beads"
|
||||||
|
"github.com/steveyegge/gastown/internal/convoy"
|
||||||
"github.com/steveyegge/gastown/internal/git"
|
"github.com/steveyegge/gastown/internal/git"
|
||||||
"github.com/steveyegge/gastown/internal/mail"
|
"github.com/steveyegge/gastown/internal/mail"
|
||||||
"github.com/steveyegge/gastown/internal/protocol"
|
"github.com/steveyegge/gastown/internal/protocol"
|
||||||
@@ -449,6 +450,12 @@ func (e *Engineer) handleSuccess(mr *beads.Issue, result ProcessResult) {
|
|||||||
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mrFields.SourceIssue, err)
|
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mrFields.SourceIssue, err)
|
||||||
} else {
|
} else {
|
||||||
_, _ = fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mrFields.SourceIssue)
|
_, _ = fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mrFields.SourceIssue)
|
||||||
|
|
||||||
|
// Redundant convoy observer: check if merged issue is tracked by a convoy
|
||||||
|
logger := func(format string, args ...interface{}) {
|
||||||
|
_, _ = fmt.Fprintf(e.output, "[Engineer] "+format+"\n", args...)
|
||||||
|
}
|
||||||
|
convoy.CheckConvoysForIssue(e.rig.Path, mrFields.SourceIssue, "refinery", logger)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -557,6 +564,12 @@ func (e *Engineer) HandleMRInfoSuccess(mr *MRInfo, result ProcessResult) {
|
|||||||
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mr.SourceIssue, err)
|
_, _ = fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mr.SourceIssue, err)
|
||||||
} else {
|
} else {
|
||||||
_, _ = fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mr.SourceIssue)
|
_, _ = fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mr.SourceIssue)
|
||||||
|
|
||||||
|
// Redundant convoy observer: check if merged issue is tracked by a convoy
|
||||||
|
logger := func(format string, args ...interface{}) {
|
||||||
|
_, _ = fmt.Fprintf(e.output, "[Engineer] "+format+"\n", args...)
|
||||||
|
}
|
||||||
|
convoy.CheckConvoysForIssue(e.rig.Path, mr.SourceIssue, "refinery", logger)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/steveyegge/gastown/internal/beads"
|
"github.com/steveyegge/gastown/internal/beads"
|
||||||
|
"github.com/steveyegge/gastown/internal/convoy"
|
||||||
"github.com/steveyegge/gastown/internal/git"
|
"github.com/steveyegge/gastown/internal/git"
|
||||||
"github.com/steveyegge/gastown/internal/mail"
|
"github.com/steveyegge/gastown/internal/mail"
|
||||||
"github.com/steveyegge/gastown/internal/rig"
|
"github.com/steveyegge/gastown/internal/rig"
|
||||||
@@ -264,6 +265,14 @@ func HandleMerged(workDir, rigName string, msg *mail.Message) *HandlerResult {
|
|||||||
result.Handled = true
|
result.Handled = true
|
||||||
result.WispCreated = wispID
|
result.WispCreated = wispID
|
||||||
result.Action = fmt.Sprintf("auto-nuked %s (cleanup_status=clean, wisp=%s)", payload.PolecatName, wispID)
|
result.Action = fmt.Sprintf("auto-nuked %s (cleanup_status=clean, wisp=%s)", payload.PolecatName, wispID)
|
||||||
|
|
||||||
|
// Redundant convoy observer: check if completed issue is tracked by a convoy
|
||||||
|
if payload.IssueID != "" {
|
||||||
|
townRoot, _ := workspace.Find(workDir)
|
||||||
|
if townRoot != "" {
|
||||||
|
convoy.CheckConvoysForIssue(townRoot, payload.IssueID, "witness", nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case "has_uncommitted":
|
case "has_uncommitted":
|
||||||
@@ -299,6 +308,14 @@ func HandleMerged(workDir, rigName string, msg *mail.Message) *HandlerResult {
|
|||||||
result.Handled = true
|
result.Handled = true
|
||||||
result.WispCreated = wispID
|
result.WispCreated = wispID
|
||||||
result.Action = fmt.Sprintf("auto-nuked %s (commit on main, cleanup_status=%s, wisp=%s)", payload.PolecatName, cleanupStatus, wispID)
|
result.Action = fmt.Sprintf("auto-nuked %s (commit on main, cleanup_status=%s, wisp=%s)", payload.PolecatName, cleanupStatus, wispID)
|
||||||
|
|
||||||
|
// Redundant convoy observer: check if completed issue is tracked by a convoy
|
||||||
|
if payload.IssueID != "" {
|
||||||
|
townRoot, _ := workspace.Find(workDir)
|
||||||
|
if townRoot != "" {
|
||||||
|
convoy.CheckConvoysForIssue(townRoot, payload.IssueID, "witness", nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user