Files
gastown/internal/daemon/convoy_watcher.go
Jackson Cantrell 0fb3e8d5fe fix: inherit environment in daemon subprocess calls (#876)
The daemon's exec.Command calls were not explicitly setting cmd.Env,
causing subprocesses to fail when the daemon process doesn't have
the expected PATH environment variable. This manifests as:

  Warning: failed to fetch deacon inbox: exec: "gt": executable file not found in $PATH

When the daemon is started by mechanisms with minimal environments
(launchd, systemd, or shells without full PATH), executables like
gt, bd, git, and sqlite3 couldn't be found.

The fix adds cmd.Env = os.Environ() to all 15 subprocess calls across
three files, ensuring they inherit the daemon's full environment.

Affected commands:
- gt mail inbox/delete/send (lifecycle requests, notifications)
- bd sync/show/list/activity (beads operations)
- git fetch/pull (workspace pre-sync)
- sqlite3 (convoy completion queries)

Fixes #875

Co-authored-by: Jackson Cantrell <cantrelljax@gmail.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-22 16:41:51 -08:00

245 lines
6.5 KiB
Go

package daemon
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
// ConvoyWatcher monitors bd activity for issue closes and triggers convoy completion checks.
// When an issue closes, it checks if the issue is tracked by any convoy and runs the
// completion check if all tracked issues are now closed.
type ConvoyWatcher struct {
townRoot string
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger func(format string, args ...interface{})
}
// bdActivityEvent represents an event from bd activity --json.
type bdActivityEvent struct {
Timestamp string `json:"timestamp"`
Type string `json:"type"`
IssueID string `json:"issue_id"`
Symbol string `json:"symbol"`
Message string `json:"message"`
OldStatus string `json:"old_status,omitempty"`
NewStatus string `json:"new_status,omitempty"`
}
// NewConvoyWatcher creates a new convoy watcher.
func NewConvoyWatcher(townRoot string, logger func(format string, args ...interface{})) *ConvoyWatcher {
ctx, cancel := context.WithCancel(context.Background())
return &ConvoyWatcher{
townRoot: townRoot,
ctx: ctx,
cancel: cancel,
logger: logger,
}
}
// Start begins the convoy watcher goroutine.
func (w *ConvoyWatcher) Start() error {
w.wg.Add(1)
go w.run()
return nil
}
// Stop gracefully stops the convoy watcher.
func (w *ConvoyWatcher) Stop() {
w.cancel()
w.wg.Wait()
}
// run is the main watcher loop.
func (w *ConvoyWatcher) run() {
defer w.wg.Done()
for {
select {
case <-w.ctx.Done():
return
default:
// Start bd activity --follow --town --json
if err := w.watchActivity(); err != nil {
w.logger("convoy watcher: bd activity error: %v, restarting in 5s", err)
// Wait before retry, but respect context cancellation
select {
case <-w.ctx.Done():
return
case <-time.After(5 * time.Second):
// Continue to retry
}
}
}
}
}
// watchActivity starts bd activity and processes events until error or context cancellation.
func (w *ConvoyWatcher) watchActivity() error {
cmd := exec.CommandContext(w.ctx, "bd", "activity", "--follow", "--town", "--json")
cmd.Dir = w.townRoot
cmd.Env = os.Environ() // Inherit PATH to find bd executable
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("creating stdout pipe: %w", err)
}
if err := cmd.Start(); err != nil {
return fmt.Errorf("starting bd activity: %w", err)
}
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
select {
case <-w.ctx.Done():
_ = cmd.Process.Kill()
return nil
default:
}
line := scanner.Text()
w.processLine(line)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("reading bd activity: %w", err)
}
return cmd.Wait()
}
// processLine processes a single line from bd activity (NDJSON format).
func (w *ConvoyWatcher) processLine(line string) {
line = strings.TrimSpace(line)
if line == "" {
return
}
var event bdActivityEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
return // Skip malformed lines
}
// Only interested in status changes to closed
if event.Type != "status" || event.NewStatus != "closed" {
return
}
w.logger("convoy watcher: detected close of %s", event.IssueID)
// Check if this issue is tracked by any convoy
convoyIDs := w.getTrackingConvoys(event.IssueID)
if len(convoyIDs) == 0 {
return
}
w.logger("convoy watcher: %s is tracked by %d convoy(s): %v", event.IssueID, len(convoyIDs), convoyIDs)
// Check each tracking convoy for completion
for _, convoyID := range convoyIDs {
w.checkConvoyCompletion(convoyID)
}
}
// getTrackingConvoys returns convoy IDs that track the given issue.
func (w *ConvoyWatcher) getTrackingConvoys(issueID string) []string {
townBeads := filepath.Join(w.townRoot, ".beads")
dbPath := filepath.Join(townBeads, "beads.db")
// Query for convoys that track this issue
// Handle both direct ID and external reference format
safeIssueID := strings.ReplaceAll(issueID, "'", "''")
// Query for dependencies where this issue is the target
// Convoys use "tracks" type: convoy -> tracked issue (depends_on_id)
query := fmt.Sprintf(`
SELECT DISTINCT issue_id FROM dependencies
WHERE type = 'tracks'
AND (depends_on_id = '%s' OR depends_on_id LIKE '%%:%s')
`, safeIssueID, safeIssueID)
queryCmd := exec.Command("sqlite3", "-json", dbPath, query)
queryCmd.Env = os.Environ() // Inherit PATH to find sqlite3 executable
var stdout bytes.Buffer
queryCmd.Stdout = &stdout
if err := queryCmd.Run(); err != nil {
return nil
}
var results []struct {
IssueID string `json:"issue_id"`
}
if err := json.Unmarshal(stdout.Bytes(), &results); err != nil {
return nil
}
convoyIDs := make([]string, 0, len(results))
for _, r := range results {
convoyIDs = append(convoyIDs, r.IssueID)
}
return convoyIDs
}
// checkConvoyCompletion checks if all issues tracked by a convoy are closed.
// If so, runs gt convoy check to close the convoy.
func (w *ConvoyWatcher) checkConvoyCompletion(convoyID string) {
townBeads := filepath.Join(w.townRoot, ".beads")
dbPath := filepath.Join(townBeads, "beads.db")
// First check if the convoy is still open
convoyQuery := fmt.Sprintf(`SELECT status FROM issues WHERE id = '%s'`,
strings.ReplaceAll(convoyID, "'", "''"))
queryCmd := exec.Command("sqlite3", "-json", dbPath, convoyQuery)
queryCmd.Env = os.Environ() // Inherit PATH to find sqlite3 executable
var stdout bytes.Buffer
queryCmd.Stdout = &stdout
if err := queryCmd.Run(); err != nil {
return
}
var convoyStatus []struct {
Status string `json:"status"`
}
if err := json.Unmarshal(stdout.Bytes(), &convoyStatus); err != nil || len(convoyStatus) == 0 {
return
}
if convoyStatus[0].Status == "closed" {
return // Already closed
}
// Run gt convoy check with specific convoy ID for targeted check
// This reuses the existing logic which handles notifications, etc.
w.logger("convoy watcher: running completion check for %s", convoyID)
checkCmd := exec.Command("gt", "convoy", "check", convoyID)
checkCmd.Dir = w.townRoot
checkCmd.Env = os.Environ() // Inherit PATH to find gt executable
var checkStdout, checkStderr bytes.Buffer
checkCmd.Stdout = &checkStdout
checkCmd.Stderr = &checkStderr
if err := checkCmd.Run(); err != nil {
w.logger("convoy watcher: gt convoy check failed: %v: %s", err, checkStderr.String())
return
}
if output := checkStdout.String(); output != "" && !strings.Contains(output, "No convoys ready") {
w.logger("convoy watcher: %s", strings.TrimSpace(output))
}
}