refactor(witness,refinery): ZFC-compliant state management
Remove state files from witness and refinery managers, following the "Discover, Don't Track" principle. Tmux session existence is now the source of truth for running state (like deacon). Changes: - Add IsRunning() that checks tmux HasSession - Change Status() to return *tmux.SessionInfo - Remove loadState/saveState/stateManager - Simplify Start()/Stop() to not use state files - Update CLI commands (witness/refinery/rig) for new API - Update tests to be ZFC-compliant This fixes state file divergence issues where witness/refinery could show "running" when the actual tmux session was dead. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
Steve Yegge
parent
126ec84bb3
commit
5218102f49
@@ -337,6 +337,14 @@ func runRefineryStop(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RefineryStatusOutput is the JSON output format for refinery status.
|
||||
type RefineryStatusOutput struct {
|
||||
Running bool `json:"running"`
|
||||
RigName string `json:"rig_name"`
|
||||
Session string `json:"session,omitempty"`
|
||||
QueueLength int `json:"queue_length"`
|
||||
}
|
||||
|
||||
func runRefineryStatus(cmd *cobra.Command, args []string) error {
|
||||
rigName := ""
|
||||
if len(args) > 0 {
|
||||
@@ -348,58 +356,42 @@ func runRefineryStatus(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
ref, err := mgr.Status()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting status: %w", err)
|
||||
}
|
||||
// ZFC: tmux is source of truth for running state
|
||||
running, _ := mgr.IsRunning()
|
||||
sessionInfo, _ := mgr.Status() // may be nil if not running
|
||||
|
||||
// Get queue from beads
|
||||
queue, _ := mgr.Queue()
|
||||
queueLen := len(queue)
|
||||
|
||||
// JSON output
|
||||
if refineryStatusJSON {
|
||||
output := RefineryStatusOutput{
|
||||
Running: running,
|
||||
RigName: rigName,
|
||||
QueueLength: queueLen,
|
||||
}
|
||||
if sessionInfo != nil {
|
||||
output.Session = sessionInfo.Name
|
||||
}
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(ref)
|
||||
return enc.Encode(output)
|
||||
}
|
||||
|
||||
// Human-readable output
|
||||
fmt.Printf("%s Refinery: %s\n\n", style.Bold.Render("⚙"), rigName)
|
||||
|
||||
stateStr := string(ref.State)
|
||||
switch ref.State {
|
||||
case refinery.StateRunning:
|
||||
stateStr = style.Bold.Render("● running")
|
||||
case refinery.StateStopped:
|
||||
stateStr = style.Dim.Render("○ stopped")
|
||||
case refinery.StatePaused:
|
||||
stateStr = style.Dim.Render("⏸ paused")
|
||||
}
|
||||
fmt.Printf(" State: %s\n", stateStr)
|
||||
|
||||
if ref.StartedAt != nil {
|
||||
fmt.Printf(" Started: %s\n", ref.StartedAt.Format("2006-01-02 15:04:05"))
|
||||
}
|
||||
|
||||
if ref.CurrentMR != nil {
|
||||
fmt.Printf("\n %s\n", style.Bold.Render("Currently Processing:"))
|
||||
fmt.Printf(" Branch: %s\n", ref.CurrentMR.Branch)
|
||||
fmt.Printf(" Worker: %s\n", ref.CurrentMR.Worker)
|
||||
if ref.CurrentMR.IssueID != "" {
|
||||
fmt.Printf(" Issue: %s\n", ref.CurrentMR.IssueID)
|
||||
if running {
|
||||
fmt.Printf(" State: %s\n", style.Bold.Render("● running"))
|
||||
if sessionInfo != nil {
|
||||
fmt.Printf(" Session: %s\n", sessionInfo.Name)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" State: %s\n", style.Dim.Render("○ stopped"))
|
||||
}
|
||||
|
||||
// Get queue length
|
||||
queue, _ := mgr.Queue()
|
||||
pendingCount := 0
|
||||
for _, item := range queue {
|
||||
if item.Position > 0 { // Not currently processing
|
||||
pendingCount++
|
||||
}
|
||||
}
|
||||
fmt.Printf("\n Queue: %d pending\n", pendingCount)
|
||||
|
||||
if ref.LastMergeAt != nil {
|
||||
fmt.Printf(" Last merge: %s\n", ref.LastMergeAt.Format("2006-01-02 15:04:05"))
|
||||
}
|
||||
fmt.Printf("\n Queue: %d pending\n", queueLen)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -977,8 +977,7 @@ func runRigShutdown(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// 2. Stop the refinery
|
||||
refMgr := refinery.NewManager(r)
|
||||
refStatus, err := refMgr.Status()
|
||||
if err == nil && refStatus.State == refinery.StateRunning {
|
||||
if running, _ := refMgr.IsRunning(); running {
|
||||
fmt.Printf(" Stopping refinery...\n")
|
||||
if err := refMgr.Stop(); err != nil {
|
||||
errors = append(errors, fmt.Sprintf("refinery: %v", err))
|
||||
@@ -987,8 +986,7 @@ func runRigShutdown(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// 3. Stop the witness
|
||||
witMgr := witness.NewManager(r)
|
||||
witStatus, err := witMgr.Status()
|
||||
if err == nil && witStatus.State == witness.StateRunning {
|
||||
if running, _ := witMgr.IsRunning(); running {
|
||||
fmt.Printf(" Stopping witness...\n")
|
||||
if err := witMgr.Stop(); err != nil {
|
||||
errors = append(errors, fmt.Sprintf("witness: %v", err))
|
||||
@@ -1077,14 +1075,9 @@ func runRigStatus(cmd *cobra.Command, args []string) error {
|
||||
fmt.Printf("%s\n", style.Bold.Render("Witness"))
|
||||
witnessSession := fmt.Sprintf("gt-%s-witness", rigName)
|
||||
witnessRunning, _ := t.HasSession(witnessSession)
|
||||
witMgr := witness.NewManager(r)
|
||||
witStatus, _ := witMgr.Status()
|
||||
_ = witness.NewManager(r) // silence unused warning, manager created for consistency
|
||||
if witnessRunning {
|
||||
fmt.Printf(" %s running", style.Success.Render("●"))
|
||||
if witStatus != nil && witStatus.StartedAt != nil {
|
||||
fmt.Printf(" (uptime: %s)", formatDuration(time.Since(*witStatus.StartedAt)))
|
||||
}
|
||||
fmt.Printf("\n")
|
||||
fmt.Printf(" %s running\n", style.Success.Render("●"))
|
||||
} else {
|
||||
fmt.Printf(" %s stopped\n", style.Dim.Render("○"))
|
||||
}
|
||||
@@ -1092,16 +1085,10 @@ func runRigStatus(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// Refinery status
|
||||
fmt.Printf("%s\n", style.Bold.Render("Refinery"))
|
||||
refinerySession := fmt.Sprintf("gt-%s-refinery", rigName)
|
||||
refineryRunning, _ := t.HasSession(refinerySession)
|
||||
refMgr := refinery.NewManager(r)
|
||||
refStatus, _ := refMgr.Status()
|
||||
refineryRunning, _ := refMgr.IsRunning()
|
||||
if refineryRunning {
|
||||
fmt.Printf(" %s running", style.Success.Render("●"))
|
||||
if refStatus != nil && refStatus.StartedAt != nil {
|
||||
fmt.Printf(" (uptime: %s)", formatDuration(time.Since(*refStatus.StartedAt)))
|
||||
}
|
||||
fmt.Printf("\n")
|
||||
fmt.Printf(" %s running\n", style.Success.Render("●"))
|
||||
// Show queue size
|
||||
queue, err := refMgr.Queue()
|
||||
if err == nil && len(queue) > 0 {
|
||||
@@ -1254,8 +1241,7 @@ func runRigStop(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// 2. Stop the refinery
|
||||
refMgr := refinery.NewManager(r)
|
||||
refStatus, err := refMgr.Status()
|
||||
if err == nil && refStatus.State == refinery.StateRunning {
|
||||
if running, _ := refMgr.IsRunning(); running {
|
||||
fmt.Printf(" Stopping refinery...\n")
|
||||
if err := refMgr.Stop(); err != nil {
|
||||
errors = append(errors, fmt.Sprintf("refinery: %v", err))
|
||||
@@ -1264,8 +1250,7 @@ func runRigStop(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// 3. Stop the witness
|
||||
witMgr := witness.NewManager(r)
|
||||
witStatus, err := witMgr.Status()
|
||||
if err == nil && witStatus.State == witness.StateRunning {
|
||||
if running, _ := witMgr.IsRunning(); running {
|
||||
fmt.Printf(" Stopping witness...\n")
|
||||
if err := witMgr.Stop(); err != nil {
|
||||
errors = append(errors, fmt.Sprintf("witness: %v", err))
|
||||
@@ -1387,8 +1372,7 @@ func runRigRestart(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// 2. Stop the refinery
|
||||
refMgr := refinery.NewManager(r)
|
||||
refStatus, err := refMgr.Status()
|
||||
if err == nil && refStatus.State == refinery.StateRunning {
|
||||
if running, _ := refMgr.IsRunning(); running {
|
||||
fmt.Printf(" Stopping refinery...\n")
|
||||
if err := refMgr.Stop(); err != nil {
|
||||
stopErrors = append(stopErrors, fmt.Sprintf("refinery: %v", err))
|
||||
@@ -1397,8 +1381,7 @@ func runRigRestart(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// 3. Stop the witness
|
||||
witMgr := witness.NewManager(r)
|
||||
witStatus, err := witMgr.Status()
|
||||
if err == nil && witStatus.State == witness.StateRunning {
|
||||
if running, _ := witMgr.IsRunning(); running {
|
||||
fmt.Printf(" Stopping witness...\n")
|
||||
if err := witMgr.Stop(); err != nil {
|
||||
stopErrors = append(stopErrors, fmt.Sprintf("witness: %v", err))
|
||||
|
||||
@@ -218,65 +218,65 @@ func runWitnessStop(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WitnessStatusOutput is the JSON output format for witness status.
|
||||
type WitnessStatusOutput struct {
|
||||
Running bool `json:"running"`
|
||||
RigName string `json:"rig_name"`
|
||||
Session string `json:"session,omitempty"`
|
||||
MonitoredPolecats []string `json:"monitored_polecats,omitempty"`
|
||||
}
|
||||
|
||||
func runWitnessStatus(cmd *cobra.Command, args []string) error {
|
||||
rigName := args[0]
|
||||
|
||||
mgr, err := getWitnessManager(rigName)
|
||||
// Get rig for polecat info
|
||||
_, r, err := getRig(rigName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w, err := mgr.Status()
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting status: %w", err)
|
||||
}
|
||||
mgr := witness.NewManager(r)
|
||||
|
||||
// Check actual tmux session state (more reliable than state file)
|
||||
t := tmux.NewTmux()
|
||||
sessionName := witnessSessionName(rigName)
|
||||
sessionRunning, _ := t.HasSession(sessionName)
|
||||
// ZFC: tmux is source of truth for running state
|
||||
running, _ := mgr.IsRunning()
|
||||
sessionInfo, _ := mgr.Status() // may be nil if not running
|
||||
|
||||
// Reconcile state: tmux session is the source of truth for background mode
|
||||
if sessionRunning && w.State != witness.StateRunning {
|
||||
w.State = witness.StateRunning
|
||||
} else if !sessionRunning && w.State == witness.StateRunning {
|
||||
w.State = witness.StateStopped
|
||||
}
|
||||
// Polecats come from rig config, not state file
|
||||
polecats := r.Polecats
|
||||
|
||||
// JSON output
|
||||
if witnessStatusJSON {
|
||||
output := WitnessStatusOutput{
|
||||
Running: running,
|
||||
RigName: rigName,
|
||||
MonitoredPolecats: polecats,
|
||||
}
|
||||
if sessionInfo != nil {
|
||||
output.Session = sessionInfo.Name
|
||||
}
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(w)
|
||||
return enc.Encode(output)
|
||||
}
|
||||
|
||||
// Human-readable output
|
||||
fmt.Printf("%s Witness: %s\n\n", style.Bold.Render(AgentTypeIcons[AgentWitness]), rigName)
|
||||
|
||||
stateStr := string(w.State)
|
||||
switch w.State {
|
||||
case witness.StateRunning:
|
||||
stateStr = style.Bold.Render("● running")
|
||||
case witness.StateStopped:
|
||||
stateStr = style.Dim.Render("○ stopped")
|
||||
case witness.StatePaused:
|
||||
stateStr = style.Dim.Render("⏸ paused")
|
||||
}
|
||||
fmt.Printf(" State: %s\n", stateStr)
|
||||
if sessionRunning {
|
||||
fmt.Printf(" Session: %s\n", sessionName)
|
||||
}
|
||||
|
||||
if w.StartedAt != nil {
|
||||
fmt.Printf(" Started: %s\n", w.StartedAt.Format("2006-01-02 15:04:05"))
|
||||
if running {
|
||||
fmt.Printf(" State: %s\n", style.Bold.Render("● running"))
|
||||
if sessionInfo != nil {
|
||||
fmt.Printf(" Session: %s\n", sessionInfo.Name)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" State: %s\n", style.Dim.Render("○ stopped"))
|
||||
}
|
||||
|
||||
// Show monitored polecats
|
||||
fmt.Printf("\n %s\n", style.Bold.Render("Monitored Polecats:"))
|
||||
if len(w.MonitoredPolecats) == 0 {
|
||||
if len(polecats) == 0 {
|
||||
fmt.Printf(" %s\n", style.Dim.Render("(none)"))
|
||||
} else {
|
||||
for _, p := range w.MonitoredPolecats {
|
||||
for _, p := range polecats {
|
||||
fmt.Printf(" • %s\n", p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package refinery
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -52,89 +51,50 @@ func (m *Manager) SetOutput(w io.Writer) {
|
||||
m.output = w
|
||||
}
|
||||
|
||||
// stateFile returns the path to the refinery state file.
|
||||
func (m *Manager) stateFile() string {
|
||||
return filepath.Join(m.rig.Path, ".runtime", "refinery.json")
|
||||
}
|
||||
|
||||
// SessionName returns the tmux session name for this refinery.
|
||||
func (m *Manager) SessionName() string {
|
||||
return fmt.Sprintf("gt-%s-refinery", m.rig.Name)
|
||||
}
|
||||
|
||||
// loadState loads refinery state from disk.
|
||||
func (m *Manager) loadState() (*Refinery, error) {
|
||||
data, err := os.ReadFile(m.stateFile())
|
||||
// IsRunning checks if the refinery session is active.
|
||||
// ZFC: tmux session existence is the source of truth.
|
||||
func (m *Manager) IsRunning() (bool, error) {
|
||||
t := tmux.NewTmux()
|
||||
return t.HasSession(m.SessionName())
|
||||
}
|
||||
|
||||
// Status returns information about the refinery session.
|
||||
// ZFC-compliant: tmux session is the source of truth.
|
||||
func (m *Manager) Status() (*tmux.SessionInfo, error) {
|
||||
t := tmux.NewTmux()
|
||||
sessionID := m.SessionName()
|
||||
|
||||
running, err := t.HasSession(sessionID)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return &Refinery{
|
||||
RigName: m.rig.Name,
|
||||
State: StateStopped,
|
||||
}, nil
|
||||
}
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("checking session: %w", err)
|
||||
}
|
||||
if !running {
|
||||
return nil, ErrNotRunning
|
||||
}
|
||||
|
||||
var ref Refinery
|
||||
if err := json.Unmarshal(data, &ref); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ref, nil
|
||||
}
|
||||
|
||||
// saveState persists refinery state to disk using atomic write.
|
||||
func (m *Manager) saveState(ref *Refinery) error {
|
||||
dir := filepath.Dir(m.stateFile())
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return util.AtomicWriteJSON(m.stateFile(), ref)
|
||||
}
|
||||
|
||||
// Status returns the current refinery status.
|
||||
// ZFC-compliant: trusts agent-reported state, no PID/tmux inference.
|
||||
// The daemon reads agent bead state for liveness checks.
|
||||
func (m *Manager) Status() (*Refinery, error) {
|
||||
return m.loadState()
|
||||
return t.GetSessionInfo(sessionID)
|
||||
}
|
||||
|
||||
// Start starts the refinery.
|
||||
// If foreground is true, runs in the current process (blocking) using the Go-based polling loop.
|
||||
// If foreground is true, returns an error (foreground mode deprecated).
|
||||
// Otherwise, spawns a Claude agent in a tmux session to process the merge queue.
|
||||
// The agentOverride parameter allows specifying an agent alias to use instead of the town default.
|
||||
// ZFC-compliant: no state file, tmux session is source of truth.
|
||||
func (m *Manager) Start(foreground bool, agentOverride string) error {
|
||||
ref, err := m.loadState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t := tmux.NewTmux()
|
||||
sessionID := m.SessionName()
|
||||
|
||||
if foreground {
|
||||
// In foreground mode, check tmux session (no PID inference per ZFC)
|
||||
// Use IsClaudeRunning for robust detection (see gastown#566)
|
||||
if running, _ := t.HasSession(sessionID); running && t.IsClaudeRunning(sessionID) {
|
||||
return ErrAlreadyRunning
|
||||
}
|
||||
|
||||
// Running in foreground - update state and run the Go-based polling loop
|
||||
now := time.Now()
|
||||
ref.State = StateRunning
|
||||
ref.StartedAt = &now
|
||||
ref.PID = 0 // No longer track PID (ZFC)
|
||||
|
||||
if err := m.saveState(ref); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Run the processing loop (blocking)
|
||||
return m.run(ref)
|
||||
// Foreground mode is deprecated - the Refinery agent handles merge processing
|
||||
return fmt.Errorf("foreground mode is deprecated; use background mode (remove --foreground flag)")
|
||||
}
|
||||
|
||||
// Background mode: check if session already exists
|
||||
// Check if session already exists
|
||||
running, _ := t.HasSession(sessionID)
|
||||
if running {
|
||||
// Session exists - check if Claude is actually running (healthy vs zombie)
|
||||
@@ -213,16 +173,6 @@ func (m *Manager) Start(foreground bool, agentOverride string) error {
|
||||
theme := tmux.AssignTheme(m.rig.Name)
|
||||
_ = t.ConfigureGasTownSession(sessionID, theme, m.rig.Name, "refinery", "refinery")
|
||||
|
||||
// Update state to running
|
||||
now := time.Now()
|
||||
ref.State = StateRunning
|
||||
ref.StartedAt = &now
|
||||
ref.PID = 0 // Claude agent doesn't have a PID we track
|
||||
if err := m.saveState(ref); err != nil {
|
||||
_ = t.KillSession(sessionID) // best-effort cleanup on state save failure
|
||||
return fmt.Errorf("saving state: %w", err)
|
||||
}
|
||||
|
||||
// Wait for Claude to start and show its prompt - fatal if Claude fails to launch
|
||||
// WaitForRuntimeReady waits for the runtime to be ready
|
||||
if err := t.WaitForRuntimeReady(sessionID, runtimeConfig, constants.ClaudeStartTimeout); err != nil {
|
||||
@@ -256,37 +206,24 @@ func (m *Manager) Start(foreground bool, agentOverride string) error {
|
||||
}
|
||||
|
||||
// Stop stops the refinery.
|
||||
// ZFC-compliant: tmux session is the source of truth.
|
||||
func (m *Manager) Stop() error {
|
||||
ref, err := m.loadState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if tmux session exists
|
||||
t := tmux.NewTmux()
|
||||
sessionID := m.SessionName()
|
||||
sessionRunning, _ := t.HasSession(sessionID)
|
||||
|
||||
// If neither state nor session indicates running, it's not running
|
||||
if ref.State != StateRunning && !sessionRunning {
|
||||
// Check if tmux session exists
|
||||
running, _ := t.HasSession(sessionID)
|
||||
if !running {
|
||||
return ErrNotRunning
|
||||
}
|
||||
|
||||
// Kill tmux session if it exists (best-effort: may already be dead)
|
||||
if sessionRunning {
|
||||
_ = t.KillSession(sessionID)
|
||||
}
|
||||
|
||||
// Note: No PID-based stop per ZFC - tmux session kill is sufficient
|
||||
|
||||
ref.State = StateStopped
|
||||
ref.PID = 0
|
||||
|
||||
return m.saveState(ref)
|
||||
// Kill the tmux session
|
||||
return t.KillSession(sessionID)
|
||||
}
|
||||
|
||||
// Queue returns the current merge queue.
|
||||
// Uses beads merge-request issues as the source of truth (not git branches).
|
||||
// ZFC-compliant: beads is the source of truth, no state file.
|
||||
func (m *Manager) Queue() ([]QueueItem, error) {
|
||||
// Query beads for open merge-request type issues
|
||||
// BeadsPath() returns the git-synced beads location
|
||||
@@ -300,25 +237,6 @@ func (m *Manager) Queue() ([]QueueItem, error) {
|
||||
return nil, fmt.Errorf("querying merge queue from beads: %w", err)
|
||||
}
|
||||
|
||||
// Load any current processing state
|
||||
ref, err := m.loadState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Build queue items
|
||||
var items []QueueItem
|
||||
pos := 1
|
||||
|
||||
// Add current processing item
|
||||
if ref.CurrentMR != nil {
|
||||
items = append(items, QueueItem{
|
||||
Position: 0, // 0 = currently processing
|
||||
MR: ref.CurrentMR,
|
||||
Age: formatAge(ref.CurrentMR.CreatedAt),
|
||||
})
|
||||
}
|
||||
|
||||
// Score and sort issues by priority score (highest first)
|
||||
now := time.Now()
|
||||
type scoredIssue struct {
|
||||
@@ -336,13 +254,11 @@ func (m *Manager) Queue() ([]QueueItem, error) {
|
||||
})
|
||||
|
||||
// Convert scored issues to queue items
|
||||
var items []QueueItem
|
||||
pos := 1
|
||||
for _, s := range scored {
|
||||
mr := m.issueToMR(s.issue)
|
||||
if mr != nil {
|
||||
// Skip if this is the currently processing MR
|
||||
if ref.CurrentMR != nil && ref.CurrentMR.ID == mr.ID {
|
||||
continue
|
||||
}
|
||||
items = append(items, QueueItem{
|
||||
Position: pos,
|
||||
MR: mr,
|
||||
@@ -484,12 +400,10 @@ func (m *Manager) ProcessMR(mr *MergeRequest) MergeResult {
|
||||
// completeMR marks an MR as complete.
|
||||
// For success, pass closeReason (e.g., CloseReasonMerged).
|
||||
// For failures that should return to open, pass empty closeReason.
|
||||
// ZFC-compliant: no state file, just updates MR and emits events.
|
||||
// Deprecated: The Refinery agent handles merge processing (ZFC #5).
|
||||
func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg string) {
|
||||
ref, _ := m.loadState()
|
||||
mr.Error = errMsg
|
||||
ref.CurrentMR = nil
|
||||
|
||||
now := time.Now()
|
||||
actor := fmt.Sprintf("%s/refinery", m.rig.Name)
|
||||
|
||||
if closeReason != "" {
|
||||
@@ -498,10 +412,7 @@ func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg s
|
||||
// Log error but continue - this shouldn't happen
|
||||
_, _ = fmt.Fprintf(m.output, "Warning: failed to close MR: %v\n", err)
|
||||
}
|
||||
switch closeReason {
|
||||
case CloseReasonMerged:
|
||||
ref.LastMergeAt = &now
|
||||
case CloseReasonSuperseded:
|
||||
if closeReason == CloseReasonSuperseded {
|
||||
// Emit merge_skipped event
|
||||
_ = events.LogFeed(events.TypeMergeSkipped, actor, events.MergePayload(mr.ID, mr.Worker, mr.Branch, "superseded"))
|
||||
}
|
||||
@@ -512,8 +423,6 @@ func (m *Manager) completeMR(mr *MergeRequest, closeReason CloseReason, errMsg s
|
||||
_, _ = fmt.Fprintf(m.output, "Warning: failed to reopen MR: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
_ = m.saveState(ref) // non-fatal: state file update
|
||||
}
|
||||
|
||||
// runTests executes the test command.
|
||||
@@ -634,26 +543,11 @@ var (
|
||||
ErrMRNotFailed = errors.New("merge request has not failed")
|
||||
)
|
||||
|
||||
// GetMR returns a merge request by ID from the state.
|
||||
// GetMR returns a merge request by ID.
|
||||
// ZFC-compliant: delegates to FindMR which uses beads as source of truth.
|
||||
// Deprecated: Use FindMR directly for more flexible matching.
|
||||
func (m *Manager) GetMR(id string) (*MergeRequest, error) {
|
||||
ref, err := m.loadState()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Check if it's the current MR
|
||||
if ref.CurrentMR != nil && ref.CurrentMR.ID == id {
|
||||
return ref.CurrentMR, nil
|
||||
}
|
||||
|
||||
// Check pending MRs
|
||||
if ref.PendingMRs != nil {
|
||||
if mr, ok := ref.PendingMRs[id]; ok {
|
||||
return mr, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, ErrMRNotFound
|
||||
return m.FindMR(id)
|
||||
}
|
||||
|
||||
// FindMR finds a merge request by ID or branch name in the queue.
|
||||
@@ -684,60 +578,19 @@ func (m *Manager) FindMR(idOrBranch string) (*MergeRequest, error) {
|
||||
return nil, ErrMRNotFound
|
||||
}
|
||||
|
||||
// Retry resets a failed merge request so it can be processed again.
|
||||
// The processNow parameter is deprecated - the Refinery agent handles processing.
|
||||
// Clearing the error is sufficient; the agent will pick up the MR in its next patrol cycle.
|
||||
func (m *Manager) Retry(id string, processNow bool) error {
|
||||
ref, err := m.loadState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Find the MR
|
||||
var mr *MergeRequest
|
||||
if ref.PendingMRs != nil {
|
||||
mr = ref.PendingMRs[id]
|
||||
}
|
||||
if mr == nil {
|
||||
return ErrMRNotFound
|
||||
}
|
||||
|
||||
// Verify it's in a failed state (open with an error)
|
||||
if mr.Status != MROpen || mr.Error == "" {
|
||||
return ErrMRNotFailed
|
||||
}
|
||||
|
||||
// Clear the error to mark as ready for retry
|
||||
mr.Error = ""
|
||||
|
||||
// Save the state
|
||||
if err := m.saveState(ref); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Note: processNow is deprecated (ZFC #5).
|
||||
// The Refinery agent handles merge processing.
|
||||
// It will pick up this MR in its next patrol cycle.
|
||||
if processNow {
|
||||
_, _ = fmt.Fprintln(m.output, "Note: --now is deprecated. The Refinery agent will process this MR in its next patrol cycle.")
|
||||
}
|
||||
|
||||
// Retry is deprecated - the Refinery agent handles retry logic autonomously.
|
||||
// ZFC-compliant: no state file, agent uses beads issue status.
|
||||
// The agent will automatically retry failed MRs in its patrol cycle.
|
||||
func (m *Manager) Retry(_ string, _ bool) error {
|
||||
_, _ = fmt.Fprintln(m.output, "Note: Retry is deprecated. The Refinery agent handles retries autonomously via beads.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterMR adds a merge request to the pending queue.
|
||||
func (m *Manager) RegisterMR(mr *MergeRequest) error {
|
||||
ref, err := m.loadState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ref.PendingMRs == nil {
|
||||
ref.PendingMRs = make(map[string]*MergeRequest)
|
||||
}
|
||||
|
||||
ref.PendingMRs[mr.ID] = mr
|
||||
return m.saveState(ref)
|
||||
// RegisterMR is deprecated - MRs are registered via beads merge-request issues.
|
||||
// ZFC-compliant: beads is the source of truth, not state file.
|
||||
// Use 'gt mr create' or create a merge-request type bead directly.
|
||||
func (m *Manager) RegisterMR(_ *MergeRequest) error {
|
||||
return fmt.Errorf("RegisterMR is deprecated: use beads to create merge-request issues")
|
||||
}
|
||||
|
||||
// RejectMR manually rejects a merge request.
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
package refinery
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/gastown/internal/rig"
|
||||
)
|
||||
@@ -28,145 +26,96 @@ func setupTestManager(t *testing.T) (*Manager, string) {
|
||||
return NewManager(r), rigPath
|
||||
}
|
||||
|
||||
func TestManager_GetMR(t *testing.T) {
|
||||
func TestManager_SessionName(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
// Create a test MR in the pending queue
|
||||
mr := &MergeRequest{
|
||||
ID: "gt-mr-abc123",
|
||||
Branch: "polecat/Toast/gt-xyz",
|
||||
Worker: "Toast",
|
||||
IssueID: "gt-xyz",
|
||||
Status: MROpen,
|
||||
Error: "test failure",
|
||||
want := "gt-testrig-refinery"
|
||||
got := mgr.SessionName()
|
||||
if got != want {
|
||||
t.Errorf("SessionName() = %s, want %s", got, want)
|
||||
}
|
||||
|
||||
if err := mgr.RegisterMR(mr); err != nil {
|
||||
t.Fatalf("RegisterMR: %v", err)
|
||||
}
|
||||
|
||||
t.Run("find existing MR", func(t *testing.T) {
|
||||
found, err := mgr.GetMR("gt-mr-abc123")
|
||||
if err != nil {
|
||||
t.Errorf("GetMR() unexpected error: %v", err)
|
||||
}
|
||||
if found == nil {
|
||||
t.Fatal("GetMR() returned nil")
|
||||
}
|
||||
if found.ID != mr.ID {
|
||||
t.Errorf("GetMR() ID = %s, want %s", found.ID, mr.ID)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("MR not found", func(t *testing.T) {
|
||||
_, err := mgr.GetMR("nonexistent-mr")
|
||||
if err != ErrMRNotFound {
|
||||
t.Errorf("GetMR() error = %v, want %v", err, ErrMRNotFound)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestManager_Retry(t *testing.T) {
|
||||
t.Run("retry failed MR clears error", func(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
func TestManager_IsRunning_NoSession(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
// Create a failed MR
|
||||
mr := &MergeRequest{
|
||||
ID: "gt-mr-failed",
|
||||
Branch: "polecat/Toast/gt-xyz",
|
||||
Worker: "Toast",
|
||||
Status: MROpen,
|
||||
Error: "merge conflict",
|
||||
}
|
||||
|
||||
if err := mgr.RegisterMR(mr); err != nil {
|
||||
t.Fatalf("RegisterMR: %v", err)
|
||||
}
|
||||
|
||||
// Retry without processing
|
||||
err := mgr.Retry("gt-mr-failed", false)
|
||||
if err != nil {
|
||||
t.Errorf("Retry() unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Verify error was cleared
|
||||
found, _ := mgr.GetMR("gt-mr-failed")
|
||||
if found.Error != "" {
|
||||
t.Errorf("Retry() error not cleared, got %s", found.Error)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("retry non-failed MR fails", func(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
// Create a successful MR (no error)
|
||||
mr := &MergeRequest{
|
||||
ID: "gt-mr-success",
|
||||
Branch: "polecat/Toast/gt-abc",
|
||||
Worker: "Toast",
|
||||
Status: MROpen,
|
||||
Error: "", // No error
|
||||
}
|
||||
|
||||
if err := mgr.RegisterMR(mr); err != nil {
|
||||
t.Fatalf("RegisterMR: %v", err)
|
||||
}
|
||||
|
||||
err := mgr.Retry("gt-mr-success", false)
|
||||
if err != ErrMRNotFailed {
|
||||
t.Errorf("Retry() error = %v, want %v", err, ErrMRNotFailed)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("retry nonexistent MR fails", func(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
err := mgr.Retry("nonexistent", false)
|
||||
if err != ErrMRNotFound {
|
||||
t.Errorf("Retry() error = %v, want %v", err, ErrMRNotFound)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestManager_RegisterMR(t *testing.T) {
|
||||
mgr, rigPath := setupTestManager(t)
|
||||
|
||||
mr := &MergeRequest{
|
||||
ID: "gt-mr-new",
|
||||
Branch: "polecat/Cheedo/gt-123",
|
||||
Worker: "Cheedo",
|
||||
IssueID: "gt-123",
|
||||
TargetBranch: "main",
|
||||
CreatedAt: time.Now(),
|
||||
Status: MROpen,
|
||||
}
|
||||
|
||||
if err := mgr.RegisterMR(mr); err != nil {
|
||||
t.Fatalf("RegisterMR: %v", err)
|
||||
}
|
||||
|
||||
// Verify it was saved to disk
|
||||
stateFile := filepath.Join(rigPath, ".runtime", "refinery.json")
|
||||
data, err := os.ReadFile(stateFile)
|
||||
// Without a tmux session, IsRunning should return false
|
||||
// Note: this test doesn't create a tmux session, so it tests the "not running" case
|
||||
running, err := mgr.IsRunning()
|
||||
if err != nil {
|
||||
t.Fatalf("reading state file: %v", err)
|
||||
// If tmux server isn't running, HasSession returns an error
|
||||
// This is expected in test environments without tmux
|
||||
t.Logf("IsRunning returned error (expected without tmux): %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
var ref Refinery
|
||||
if err := json.Unmarshal(data, &ref); err != nil {
|
||||
t.Fatalf("unmarshal state: %v", err)
|
||||
}
|
||||
|
||||
if ref.PendingMRs == nil {
|
||||
t.Fatal("PendingMRs is nil")
|
||||
}
|
||||
|
||||
saved, ok := ref.PendingMRs["gt-mr-new"]
|
||||
if !ok {
|
||||
t.Fatal("MR not found in PendingMRs")
|
||||
}
|
||||
|
||||
if saved.Worker != "Cheedo" {
|
||||
t.Errorf("saved MR worker = %s, want Cheedo", saved.Worker)
|
||||
if running {
|
||||
t.Error("IsRunning() = true, want false (no session created)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_Status_NotRunning(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
// Without a tmux session, Status should return ErrNotRunning
|
||||
_, err := mgr.Status()
|
||||
if err == nil {
|
||||
t.Error("Status() expected error when not running")
|
||||
}
|
||||
// May return ErrNotRunning or a tmux server error
|
||||
t.Logf("Status returned error (expected): %v", err)
|
||||
}
|
||||
|
||||
func TestManager_Queue_NoBeads(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
// Queue returns error when no beads database exists
|
||||
// This is expected - beads requires initialization
|
||||
_, err := mgr.Queue()
|
||||
if err == nil {
|
||||
// If beads is somehow available, queue should be empty
|
||||
t.Log("Queue() succeeded unexpectedly (beads may be available)")
|
||||
return
|
||||
}
|
||||
// Error is expected when beads isn't initialized
|
||||
t.Logf("Queue() returned error (expected without beads): %v", err)
|
||||
}
|
||||
|
||||
func TestManager_FindMR_NoBeads(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
// FindMR returns error when no beads database exists
|
||||
_, err := mgr.FindMR("nonexistent-mr")
|
||||
if err == nil {
|
||||
t.Error("FindMR() expected error")
|
||||
}
|
||||
// Any error is acceptable when beads isn't initialized
|
||||
t.Logf("FindMR() returned error (expected): %v", err)
|
||||
}
|
||||
|
||||
func TestManager_RegisterMR_Deprecated(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
mr := &MergeRequest{
|
||||
ID: "gt-mr-test",
|
||||
Branch: "polecat/Test/gt-123",
|
||||
Worker: "Test",
|
||||
Status: MROpen,
|
||||
}
|
||||
|
||||
// RegisterMR should return an error indicating deprecation
|
||||
err := mgr.RegisterMR(mr)
|
||||
if err == nil {
|
||||
t.Error("RegisterMR() expected error (deprecated)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_Retry_Deprecated(t *testing.T) {
|
||||
mgr, _ := setupTestManager(t)
|
||||
|
||||
// Retry is deprecated and should not error, just print a message
|
||||
err := mgr.Retry("any-id", false)
|
||||
if err != nil {
|
||||
t.Errorf("Retry() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/gastown/internal/agent"
|
||||
"github.com/steveyegge/gastown/internal/beads"
|
||||
"github.com/steveyegge/gastown/internal/claude"
|
||||
"github.com/steveyegge/gastown/internal/config"
|
||||
@@ -26,10 +25,10 @@ var (
|
||||
)
|
||||
|
||||
// Manager handles witness lifecycle and monitoring operations.
|
||||
// ZFC-compliant: tmux session is the source of truth for running state.
|
||||
type Manager struct {
|
||||
rig *rig.Rig
|
||||
workDir string
|
||||
stateManager *agent.StateManager[Witness]
|
||||
rig *rig.Rig
|
||||
workDir string
|
||||
}
|
||||
|
||||
// NewManager creates a new witness manager for a rig.
|
||||
@@ -37,28 +36,14 @@ func NewManager(r *rig.Rig) *Manager {
|
||||
return &Manager{
|
||||
rig: r,
|
||||
workDir: r.Path,
|
||||
stateManager: agent.NewStateManager[Witness](r.Path, "witness.json", func() *Witness {
|
||||
return &Witness{
|
||||
RigName: r.Name,
|
||||
State: StateStopped,
|
||||
}
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// stateFile returns the path to the witness state file.
|
||||
func (m *Manager) stateFile() string {
|
||||
return m.stateManager.StateFile()
|
||||
}
|
||||
|
||||
// loadState loads witness state from disk.
|
||||
func (m *Manager) loadState() (*Witness, error) {
|
||||
return m.stateManager.Load()
|
||||
}
|
||||
|
||||
// saveState persists witness state to disk using atomic write.
|
||||
func (m *Manager) saveState(w *Witness) error {
|
||||
return m.stateManager.Save(w)
|
||||
// IsRunning checks if the witness session is active.
|
||||
// ZFC: tmux session existence is the source of truth.
|
||||
func (m *Manager) IsRunning() (bool, error) {
|
||||
t := tmux.NewTmux()
|
||||
return t.HasSession(m.SessionName())
|
||||
}
|
||||
|
||||
// SessionName returns the tmux session name for this witness.
|
||||
@@ -66,19 +51,21 @@ func (m *Manager) SessionName() string {
|
||||
return fmt.Sprintf("gt-%s-witness", m.rig.Name)
|
||||
}
|
||||
|
||||
// Status returns the current witness status.
|
||||
// ZFC-compliant: trusts agent-reported state, no PID inference.
|
||||
// The daemon reads agent bead state for liveness checks.
|
||||
func (m *Manager) Status() (*Witness, error) {
|
||||
w, err := m.loadState()
|
||||
// Status returns information about the witness session.
|
||||
// ZFC-compliant: tmux session is the source of truth.
|
||||
func (m *Manager) Status() (*tmux.SessionInfo, error) {
|
||||
t := tmux.NewTmux()
|
||||
sessionID := m.SessionName()
|
||||
|
||||
running, err := t.HasSession(sessionID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("checking session: %w", err)
|
||||
}
|
||||
if !running {
|
||||
return nil, ErrNotRunning
|
||||
}
|
||||
|
||||
// Update monitored polecats list (still useful for display)
|
||||
w.MonitoredPolecats = m.rig.Polecats
|
||||
|
||||
return w, nil
|
||||
return t.GetSessionInfo(sessionID)
|
||||
}
|
||||
|
||||
// witnessDir returns the working directory for the witness.
|
||||
@@ -98,36 +85,21 @@ func (m *Manager) witnessDir() string {
|
||||
}
|
||||
|
||||
// Start starts the witness.
|
||||
// If foreground is true, only updates state (no tmux session - deprecated).
|
||||
// If foreground is true, returns an error (foreground mode deprecated).
|
||||
// Otherwise, spawns a Claude agent in a tmux session.
|
||||
// agentOverride optionally specifies a different agent alias to use.
|
||||
// envOverrides are KEY=VALUE pairs that override all other env var sources.
|
||||
// ZFC-compliant: no state file, tmux session is source of truth.
|
||||
func (m *Manager) Start(foreground bool, agentOverride string, envOverrides []string) error {
|
||||
w, err := m.loadState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t := tmux.NewTmux()
|
||||
sessionID := m.SessionName()
|
||||
|
||||
if foreground {
|
||||
// Foreground mode is deprecated - patrol logic moved to mol-witness-patrol
|
||||
// Just check tmux session (no PID inference per ZFC)
|
||||
if running, _ := t.HasSession(sessionID); running && t.IsClaudeRunning(sessionID) {
|
||||
return ErrAlreadyRunning
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
w.State = StateRunning
|
||||
w.StartedAt = &now
|
||||
w.PID = 0 // No longer track PID (ZFC)
|
||||
w.MonitoredPolecats = m.rig.Polecats
|
||||
|
||||
return m.saveState(w)
|
||||
return fmt.Errorf("foreground mode is deprecated; use background mode (remove --foreground flag)")
|
||||
}
|
||||
|
||||
// Background mode: check if session already exists
|
||||
// Check if session already exists
|
||||
running, _ := t.HasSession(sessionID)
|
||||
if running {
|
||||
// Session exists - check if Claude is actually running (healthy vs zombie)
|
||||
@@ -200,17 +172,6 @@ func (m *Manager) Start(foreground bool, agentOverride string, envOverrides []st
|
||||
theme := tmux.AssignTheme(m.rig.Name)
|
||||
_ = t.ConfigureGasTownSession(sessionID, theme, m.rig.Name, "witness", "witness")
|
||||
|
||||
// Update state to running
|
||||
now := time.Now()
|
||||
w.State = StateRunning
|
||||
w.StartedAt = &now
|
||||
w.PID = 0 // Claude agent doesn't have a PID we track
|
||||
w.MonitoredPolecats = m.rig.Polecats
|
||||
if err := m.saveState(w); err != nil {
|
||||
_ = t.KillSession(sessionID) // best-effort cleanup on state save failure
|
||||
return fmt.Errorf("saving state: %w", err)
|
||||
}
|
||||
|
||||
// Wait for Claude to start - fatal if Claude fails to launch
|
||||
if err := t.WaitForCommand(sessionID, constants.SupportedShells, constants.ClaudeStartTimeout); err != nil {
|
||||
// Kill the zombie session before returning error
|
||||
@@ -288,31 +249,17 @@ func buildWitnessStartCommand(rigPath, rigName, townRoot, agentOverride string,
|
||||
}
|
||||
|
||||
// Stop stops the witness.
|
||||
// ZFC-compliant: tmux session is the source of truth.
|
||||
func (m *Manager) Stop() error {
|
||||
w, err := m.loadState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if tmux session exists
|
||||
t := tmux.NewTmux()
|
||||
sessionID := m.SessionName()
|
||||
sessionRunning, _ := t.HasSession(sessionID)
|
||||
|
||||
// If neither state nor session indicates running, it's not running
|
||||
if w.State != StateRunning && !sessionRunning {
|
||||
// Check if tmux session exists
|
||||
running, _ := t.HasSession(sessionID)
|
||||
if !running {
|
||||
return ErrNotRunning
|
||||
}
|
||||
|
||||
// Kill tmux session if it exists (best-effort: may already be dead)
|
||||
if sessionRunning {
|
||||
_ = t.KillSession(sessionID)
|
||||
}
|
||||
|
||||
// Note: No PID-based stop per ZFC - tmux session kill is sufficient
|
||||
|
||||
w.State = StateStopped
|
||||
w.PID = 0
|
||||
|
||||
return m.saveState(w)
|
||||
// Kill the tmux session
|
||||
return t.KillSession(sessionID)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user