Witness: Verify POLECAT_DONE before stopping sessions (gt-ldk8)
- Add handling for POLECAT_DONE messages in processShutdownRequests() - Track which polecats have signaled done (using SpawnedIssues with "done:" prefix) - For LIFECYCLE:shutdown requests, wait for POLECAT_DONE before cleanup - Add checkPendingCompletions() to nudge polecats with closed issues - Add 10-minute timeout with force-kill after waiting for POLECAT_DONE - Protects against losing MR submissions when Witness cleans up too early 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -361,6 +361,11 @@ func (m *Manager) checkAndProcess(w *Witness) {
|
|||||||
fmt.Printf("Shutdown request error: %v\n", err)
|
fmt.Printf("Shutdown request error: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check for polecats with closed issues that haven't signaled done
|
||||||
|
if err := m.checkPendingCompletions(w); err != nil {
|
||||||
|
fmt.Printf("Pending completions check error: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Auto-spawn for ready work (if enabled)
|
// Auto-spawn for ready work (if enabled)
|
||||||
if w.Config.AutoSpawn {
|
if w.Config.AutoSpawn {
|
||||||
if err := m.autoSpawnForReadyWork(w); err != nil {
|
if err := m.autoSpawnForReadyWork(w); err != nil {
|
||||||
@@ -639,7 +644,48 @@ func (m *Manager) processShutdownRequests(w *Witness) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, msg := range messages {
|
for _, msg := range messages {
|
||||||
// Look for LIFECYCLE requests
|
// Handle POLECAT_DONE messages (polecat has completed work and is ready for cleanup)
|
||||||
|
if strings.HasPrefix(msg.Subject, "POLECAT_DONE ") {
|
||||||
|
polecatName := extractPolecatNameFromDone(msg.Subject)
|
||||||
|
if polecatName == "" {
|
||||||
|
fmt.Printf("Warning: could not extract polecat name from POLECAT_DONE message\n")
|
||||||
|
m.ackMessage(msg.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Processing POLECAT_DONE from %s\n", polecatName)
|
||||||
|
|
||||||
|
// Record that this polecat has signaled done
|
||||||
|
m.recordDone(w, polecatName)
|
||||||
|
|
||||||
|
// Verify polecat state before cleanup
|
||||||
|
if err := m.verifyPolecatState(polecatName); err != nil {
|
||||||
|
fmt.Printf(" Verification failed: %v\n", err)
|
||||||
|
|
||||||
|
// Send nudge to polecat to fix state
|
||||||
|
if err := m.sendNudge(polecatName, err.Error()); err != nil {
|
||||||
|
fmt.Printf(" Warning: failed to send nudge: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't ack message - will retry on next check
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform cleanup
|
||||||
|
if err := m.cleanupPolecat(polecatName); err != nil {
|
||||||
|
fmt.Printf(" Cleanup error: %v\n", err)
|
||||||
|
// Don't ack message on error - will retry
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf(" Cleanup complete\n")
|
||||||
|
|
||||||
|
// Acknowledge the message
|
||||||
|
m.ackMessage(msg.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle LIFECYCLE shutdown requests (legacy/Deacon-managed)
|
||||||
if strings.Contains(msg.Subject, "LIFECYCLE:") && strings.Contains(msg.Subject, "shutdown") {
|
if strings.Contains(msg.Subject, "LIFECYCLE:") && strings.Contains(msg.Subject, "shutdown") {
|
||||||
fmt.Printf("Processing shutdown request: %s\n", msg.Subject)
|
fmt.Printf("Processing shutdown request: %s\n", msg.Subject)
|
||||||
|
|
||||||
@@ -653,6 +699,19 @@ func (m *Manager) processShutdownRequests(w *Witness) error {
|
|||||||
|
|
||||||
fmt.Printf(" Polecat: %s\n", polecatName)
|
fmt.Printf(" Polecat: %s\n", polecatName)
|
||||||
|
|
||||||
|
// SAFETY: Only cleanup if polecat has sent POLECAT_DONE
|
||||||
|
if !m.hasSentDone(w, polecatName) {
|
||||||
|
fmt.Printf(" Waiting for POLECAT_DONE from %s before cleanup\n", polecatName)
|
||||||
|
|
||||||
|
// Send reminder to polecat to complete shutdown sequence
|
||||||
|
if err := m.sendNudge(polecatName, "Please run 'gt done' to signal completion"); err != nil {
|
||||||
|
fmt.Printf(" Warning: failed to send nudge: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't ack message - will retry on next check
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Verify polecat state before cleanup
|
// Verify polecat state before cleanup
|
||||||
if err := m.verifyPolecatState(polecatName); err != nil {
|
if err := m.verifyPolecatState(polecatName); err != nil {
|
||||||
fmt.Printf(" Verification failed: %v\n", err)
|
fmt.Printf(" Verification failed: %v\n", err)
|
||||||
@@ -801,6 +860,202 @@ func extractPolecatName(body string) string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extractPolecatNameFromDone extracts the polecat name from a POLECAT_DONE subject.
|
||||||
|
// Subject format: "POLECAT_DONE {name}"
|
||||||
|
func extractPolecatNameFromDone(subject string) string {
|
||||||
|
const prefix = "POLECAT_DONE "
|
||||||
|
if strings.HasPrefix(subject, prefix) {
|
||||||
|
return strings.TrimSpace(subject[len(prefix):])
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordDone records that a polecat has sent POLECAT_DONE.
|
||||||
|
// Uses SpawnedIssues with "done:" prefix to track.
|
||||||
|
func (m *Manager) recordDone(w *Witness, polecatName string) {
|
||||||
|
doneKey := "done:" + polecatName
|
||||||
|
// Don't record duplicates
|
||||||
|
for _, entry := range w.SpawnedIssues {
|
||||||
|
if entry == doneKey {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.SpawnedIssues = append(w.SpawnedIssues, doneKey)
|
||||||
|
_ = m.saveState(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasSentDone checks if a polecat has sent POLECAT_DONE.
|
||||||
|
func (m *Manager) hasSentDone(w *Witness, polecatName string) bool {
|
||||||
|
doneKey := "done:" + polecatName
|
||||||
|
for _, entry := range w.SpawnedIssues {
|
||||||
|
if entry == doneKey {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// PendingCompletionTimeout is how long to wait for POLECAT_DONE after issue is closed
|
||||||
|
// before force-killing the polecat session.
|
||||||
|
const PendingCompletionTimeout = 10 * time.Minute
|
||||||
|
|
||||||
|
// checkPendingCompletions checks for polecats with closed issues that haven't sent POLECAT_DONE.
|
||||||
|
// It nudges them to complete, and force-kills after timeout.
|
||||||
|
func (m *Manager) checkPendingCompletions(w *Witness) error {
|
||||||
|
polecatMgr := polecat.NewManager(m.rig, git.NewGit(m.rig.Path))
|
||||||
|
polecats, err := polecatMgr.List()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("listing polecats: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t := tmux.NewTmux()
|
||||||
|
sessMgr := session.NewManager(t, m.rig)
|
||||||
|
|
||||||
|
for _, p := range polecats {
|
||||||
|
// Skip if not running
|
||||||
|
running, _ := sessMgr.IsRunning(p.Name)
|
||||||
|
if !running {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if already signaled done
|
||||||
|
if m.hasSentDone(w, p.Name) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the polecat's issue is closed
|
||||||
|
issueID := m.getPolecatIssue(p.Name, p.ClonePath)
|
||||||
|
if issueID == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
closed, err := m.isIssueClosed(issueID)
|
||||||
|
if err != nil || !closed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Issue is closed but polecat hasn't sent POLECAT_DONE
|
||||||
|
waitKey := "waiting:" + p.Name
|
||||||
|
waitingSince := m.getWaitingTimestamp(w, waitKey)
|
||||||
|
|
||||||
|
if waitingSince.IsZero() {
|
||||||
|
// First detection - record timestamp and nudge
|
||||||
|
fmt.Printf("Issue %s is closed but polecat %s hasn't signaled done\n", issueID, p.Name)
|
||||||
|
m.recordWaiting(w, waitKey)
|
||||||
|
if err := m.sendNudge(p.Name, "Your issue is closed. Please run 'gt done' to complete shutdown."); err != nil {
|
||||||
|
fmt.Printf(" Warning: failed to send nudge: %v\n", err)
|
||||||
|
}
|
||||||
|
} else if time.Since(waitingSince) > PendingCompletionTimeout {
|
||||||
|
// Timeout reached - force cleanup
|
||||||
|
fmt.Printf("Timeout waiting for POLECAT_DONE from %s, force cleaning up\n", p.Name)
|
||||||
|
|
||||||
|
// Verify state first (this still protects uncommitted work)
|
||||||
|
if err := m.verifyPolecatState(p.Name); err != nil {
|
||||||
|
fmt.Printf(" Cannot force cleanup - %v\n", err)
|
||||||
|
// Escalate to Mayor
|
||||||
|
m.escalateToMayor(p.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.cleanupPolecat(p.Name); err != nil {
|
||||||
|
fmt.Printf(" Force cleanup failed: %v\n", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up tracking
|
||||||
|
m.clearWaiting(w, waitKey)
|
||||||
|
} else {
|
||||||
|
// Still waiting
|
||||||
|
elapsed := time.Since(waitingSince).Round(time.Minute)
|
||||||
|
remaining := (PendingCompletionTimeout - time.Since(waitingSince)).Round(time.Minute)
|
||||||
|
fmt.Printf("Waiting for POLECAT_DONE from %s (elapsed: %v, timeout in: %v)\n",
|
||||||
|
p.Name, elapsed, remaining)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPolecatIssue tries to determine which issue a polecat is working on.
|
||||||
|
func (m *Manager) getPolecatIssue(polecatName, polecatPath string) string {
|
||||||
|
// Try to read from state file
|
||||||
|
stateFile := filepath.Join(polecatPath, ".runtime", "state.json")
|
||||||
|
data, err := os.ReadFile(stateFile)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
var state struct {
|
||||||
|
IssueID string `json:"issue_id"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(data, &state); err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return state.IssueID
|
||||||
|
}
|
||||||
|
|
||||||
|
// isIssueClosed checks if an issue is closed.
|
||||||
|
func (m *Manager) isIssueClosed(issueID string) (bool, error) {
|
||||||
|
cmd := exec.Command("bd", "show", issueID, "--json")
|
||||||
|
cmd.Dir = m.workDir
|
||||||
|
|
||||||
|
var stdout, stderr bytes.Buffer
|
||||||
|
cmd.Stdout = &stdout
|
||||||
|
cmd.Stderr = &stderr
|
||||||
|
|
||||||
|
if err := cmd.Run(); err != nil {
|
||||||
|
return false, fmt.Errorf("%s", stderr.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse to check status
|
||||||
|
var issues []struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(stdout.Bytes(), &issues); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(issues) == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return issues[0].Status == "closed", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getWaitingTimestamp retrieves when we started waiting for a polecat.
|
||||||
|
func (m *Manager) getWaitingTimestamp(w *Witness, key string) time.Time {
|
||||||
|
// Parse timestamps from SpawnedIssues with "waiting:{name}:{timestamp}" format
|
||||||
|
for _, entry := range w.SpawnedIssues {
|
||||||
|
if strings.HasPrefix(entry, key+":") {
|
||||||
|
tsStr := entry[len(key)+1:]
|
||||||
|
if ts, err := time.Parse(time.RFC3339, tsStr); err == nil {
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordWaiting records when we started waiting for a polecat to complete.
|
||||||
|
func (m *Manager) recordWaiting(w *Witness, key string) {
|
||||||
|
entry := fmt.Sprintf("%s:%s", key, time.Now().Format(time.RFC3339))
|
||||||
|
w.SpawnedIssues = append(w.SpawnedIssues, entry)
|
||||||
|
_ = m.saveState(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// clearWaiting removes the waiting timestamp for a polecat.
|
||||||
|
func (m *Manager) clearWaiting(w *Witness, key string) {
|
||||||
|
var filtered []string
|
||||||
|
for _, entry := range w.SpawnedIssues {
|
||||||
|
if !strings.HasPrefix(entry, key) {
|
||||||
|
filtered = append(filtered, entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.SpawnedIssues = filtered
|
||||||
|
_ = m.saveState(w)
|
||||||
|
}
|
||||||
|
|
||||||
// cleanupPolecat performs the full cleanup sequence for a transient polecat.
|
// cleanupPolecat performs the full cleanup sequence for a transient polecat.
|
||||||
// 1. Check for uncommitted work (stubbornly refuses to lose work)
|
// 1. Check for uncommitted work (stubbornly refuses to lose work)
|
||||||
// 2. Kill session
|
// 2. Kill session
|
||||||
|
|||||||
Reference in New Issue
Block a user