Add identity collision prevention, detection, and correction (gt-xp2s)
- internal/lock: New package with PID-based lockfiles for worker identity - gt prime: Acquire identity lock for crew/polecat roles, fail on collision - gt agents check: Detect stale locks and identity collisions - gt agents fix: Clean up stale locks - gt doctor: New identity-collision check with --fix support 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,14 +1,19 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/steveyegge/gastown/internal/lock"
|
||||||
|
"github.com/steveyegge/gastown/internal/style"
|
||||||
"github.com/steveyegge/gastown/internal/tmux"
|
"github.com/steveyegge/gastown/internal/tmux"
|
||||||
|
"github.com/steveyegge/gastown/internal/workspace"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AgentType represents the type of Gas Town agent.
|
// AgentType represents the type of Gas Town agent.
|
||||||
@@ -71,11 +76,49 @@ var agentsListCmd = &cobra.Command{
|
|||||||
RunE: runAgentsList,
|
RunE: runAgentsList,
|
||||||
}
|
}
|
||||||
|
|
||||||
var agentsAllFlag bool
|
var agentsCheckCmd = &cobra.Command{
|
||||||
|
Use: "check",
|
||||||
|
Short: "Check for identity collisions and stale locks",
|
||||||
|
Long: `Check for identity collisions and stale locks.
|
||||||
|
|
||||||
|
This command helps detect situations where multiple Claude processes
|
||||||
|
think they own the same worker identity.
|
||||||
|
|
||||||
|
Output shows:
|
||||||
|
- Active tmux sessions with gt- prefix
|
||||||
|
- Identity locks in worker directories
|
||||||
|
- Collisions (multiple agents claiming same identity)
|
||||||
|
- Stale locks (dead PIDs)`,
|
||||||
|
RunE: runAgentsCheck,
|
||||||
|
}
|
||||||
|
|
||||||
|
var agentsFixCmd = &cobra.Command{
|
||||||
|
Use: "fix",
|
||||||
|
Short: "Fix identity collisions and clean up stale locks",
|
||||||
|
Long: `Clean up identity collisions and stale locks.
|
||||||
|
|
||||||
|
This command:
|
||||||
|
1. Removes stale locks (where the PID is dead)
|
||||||
|
2. Reports collisions that need manual intervention
|
||||||
|
|
||||||
|
For collisions with live processes, you must manually:
|
||||||
|
- Kill the duplicate session, OR
|
||||||
|
- Decide which agent should own the identity`,
|
||||||
|
RunE: runAgentsFix,
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
agentsAllFlag bool
|
||||||
|
agentsCheckJSON bool
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
agentsCmd.PersistentFlags().BoolVarP(&agentsAllFlag, "all", "a", false, "Include polecats in the menu")
|
agentsCmd.PersistentFlags().BoolVarP(&agentsAllFlag, "all", "a", false, "Include polecats in the menu")
|
||||||
|
agentsCheckCmd.Flags().BoolVar(&agentsCheckJSON, "json", false, "Output as JSON")
|
||||||
|
|
||||||
agentsCmd.AddCommand(agentsListCmd)
|
agentsCmd.AddCommand(agentsListCmd)
|
||||||
|
agentsCmd.AddCommand(agentsCheckCmd)
|
||||||
|
agentsCmd.AddCommand(agentsFixCmd)
|
||||||
rootCmd.AddCommand(agentsCmd)
|
rootCmd.AddCommand(agentsCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,3 +376,200 @@ func runAgentsList(cmd *cobra.Command, args []string) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CollisionReport holds the results of a collision check.
|
||||||
|
type CollisionReport struct {
|
||||||
|
TotalSessions int `json:"total_sessions"`
|
||||||
|
TotalLocks int `json:"total_locks"`
|
||||||
|
Collisions int `json:"collisions"`
|
||||||
|
StaleLocks int `json:"stale_locks"`
|
||||||
|
Issues []CollisionIssue `json:"issues,omitempty"`
|
||||||
|
Locks map[string]*lock.LockInfo `json:"locks,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CollisionIssue describes a single collision or lock issue.
|
||||||
|
type CollisionIssue struct {
|
||||||
|
Type string `json:"type"` // "stale", "collision", "orphaned"
|
||||||
|
WorkerDir string `json:"worker_dir"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
PID int `json:"pid,omitempty"`
|
||||||
|
SessionID string `json:"session_id,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func runAgentsCheck(cmd *cobra.Command, args []string) error {
|
||||||
|
townRoot, err := workspace.FindFromCwdOrError()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("not in a Gas Town workspace: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
report, err := buildCollisionReport(townRoot)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if agentsCheckJSON {
|
||||||
|
enc := json.NewEncoder(os.Stdout)
|
||||||
|
enc.SetIndent("", " ")
|
||||||
|
return enc.Encode(report)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Text output
|
||||||
|
if len(report.Issues) == 0 {
|
||||||
|
fmt.Printf("%s All agents healthy\n", style.Bold.Render("✓"))
|
||||||
|
fmt.Printf(" Sessions: %d, Locks: %d\n", report.TotalSessions, report.TotalLocks)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("%s\n\n", style.Bold.Render("⚠️ Issues Detected"))
|
||||||
|
fmt.Printf("Collisions: %d, Stale locks: %d\n\n", report.Collisions, report.StaleLocks)
|
||||||
|
|
||||||
|
for _, issue := range report.Issues {
|
||||||
|
fmt.Printf("%s %s\n", style.Bold.Render("!"), issue.Message)
|
||||||
|
fmt.Printf(" Dir: %s\n", issue.WorkerDir)
|
||||||
|
if issue.PID > 0 {
|
||||||
|
fmt.Printf(" PID: %d\n", issue.PID)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Run %s to fix stale locks\n", style.Dim.Render("gt agents fix"))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func runAgentsFix(cmd *cobra.Command, args []string) error {
|
||||||
|
townRoot, err := workspace.FindFromCwdOrError()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("not in a Gas Town workspace: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean stale locks
|
||||||
|
cleaned, err := lock.CleanStaleLocks(townRoot)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cleaning stale locks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cleaned > 0 {
|
||||||
|
fmt.Printf("%s Cleaned %d stale lock(s)\n", style.Bold.Render("✓"), cleaned)
|
||||||
|
} else {
|
||||||
|
fmt.Printf("%s No stale locks found\n", style.Dim.Render("○"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for remaining issues
|
||||||
|
report, err := buildCollisionReport(townRoot)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if report.Collisions > 0 {
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Printf("%s %d collision(s) require manual intervention:\n\n",
|
||||||
|
style.Bold.Render("⚠"), report.Collisions)
|
||||||
|
|
||||||
|
for _, issue := range report.Issues {
|
||||||
|
if issue.Type == "collision" {
|
||||||
|
fmt.Printf(" %s %s\n", style.Bold.Render("!"), issue.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Printf("To fix, close duplicate sessions or remove lock files manually.\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildCollisionReport(townRoot string) (*CollisionReport, error) {
|
||||||
|
report := &CollisionReport{
|
||||||
|
Locks: make(map[string]*lock.LockInfo),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all tmux sessions
|
||||||
|
t := tmux.NewTmux()
|
||||||
|
sessions, err := t.ListSessions()
|
||||||
|
if err != nil {
|
||||||
|
sessions = []string{} // Continue even if tmux not running
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter to gt- sessions
|
||||||
|
var gtSessions []string
|
||||||
|
for _, s := range sessions {
|
||||||
|
if strings.HasPrefix(s, "gt-") {
|
||||||
|
gtSessions = append(gtSessions, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
report.TotalSessions = len(gtSessions)
|
||||||
|
|
||||||
|
// Find all locks
|
||||||
|
locks, err := lock.FindAllLocks(townRoot)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("finding locks: %w", err)
|
||||||
|
}
|
||||||
|
report.TotalLocks = len(locks)
|
||||||
|
report.Locks = locks
|
||||||
|
|
||||||
|
// Check each lock for issues
|
||||||
|
for workerDir, lockInfo := range locks {
|
||||||
|
if lockInfo.IsStale() {
|
||||||
|
report.StaleLocks++
|
||||||
|
report.Issues = append(report.Issues, CollisionIssue{
|
||||||
|
Type: "stale",
|
||||||
|
WorkerDir: workerDir,
|
||||||
|
Message: fmt.Sprintf("Stale lock (dead PID %d)", lockInfo.PID),
|
||||||
|
PID: lockInfo.PID,
|
||||||
|
SessionID: lockInfo.SessionID,
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the locked session exists in tmux
|
||||||
|
expectedSession := guessSessionFromWorkerDir(workerDir, townRoot)
|
||||||
|
if expectedSession != "" {
|
||||||
|
found := false
|
||||||
|
for _, s := range gtSessions {
|
||||||
|
if s == expectedSession {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
// Lock exists but session doesn't - potential orphan or collision
|
||||||
|
report.Collisions++
|
||||||
|
report.Issues = append(report.Issues, CollisionIssue{
|
||||||
|
Type: "orphaned",
|
||||||
|
WorkerDir: workerDir,
|
||||||
|
Message: fmt.Sprintf("Lock exists (PID %d) but no tmux session '%s'", lockInfo.PID, expectedSession),
|
||||||
|
PID: lockInfo.PID,
|
||||||
|
SessionID: lockInfo.SessionID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return report, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func guessSessionFromWorkerDir(workerDir, townRoot string) string {
|
||||||
|
relPath, err := filepath.Rel(townRoot, workerDir)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.Split(filepath.ToSlash(relPath), "/")
|
||||||
|
if len(parts) < 3 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
rig := parts[0]
|
||||||
|
workerType := parts[1]
|
||||||
|
workerName := parts[2]
|
||||||
|
|
||||||
|
switch workerType {
|
||||||
|
case "crew":
|
||||||
|
return fmt.Sprintf("gt-%s-crew-%s", rig, workerName)
|
||||||
|
case "polecats":
|
||||||
|
return fmt.Sprintf("gt-%s-%s", rig, workerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ func runDoctor(cmd *cobra.Command, args []string) error {
|
|||||||
d.Register(doctor.NewOrphanProcessCheck())
|
d.Register(doctor.NewOrphanProcessCheck())
|
||||||
d.Register(doctor.NewBranchCheck())
|
d.Register(doctor.NewBranchCheck())
|
||||||
d.Register(doctor.NewBeadsSyncOrphanCheck())
|
d.Register(doctor.NewBeadsSyncOrphanCheck())
|
||||||
|
d.Register(doctor.NewIdentityCollisionCheck())
|
||||||
|
|
||||||
// Ephemeral beads checks
|
// Ephemeral beads checks
|
||||||
d.Register(doctor.NewEphemeralExistsCheck())
|
d.Register(doctor.NewEphemeralExistsCheck())
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
@@ -10,6 +11,7 @@ import (
|
|||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/steveyegge/gastown/internal/beads"
|
"github.com/steveyegge/gastown/internal/beads"
|
||||||
|
"github.com/steveyegge/gastown/internal/lock"
|
||||||
"github.com/steveyegge/gastown/internal/style"
|
"github.com/steveyegge/gastown/internal/style"
|
||||||
"github.com/steveyegge/gastown/internal/templates"
|
"github.com/steveyegge/gastown/internal/templates"
|
||||||
"github.com/steveyegge/gastown/internal/workspace"
|
"github.com/steveyegge/gastown/internal/workspace"
|
||||||
@@ -74,6 +76,11 @@ func runPrime(cmd *cobra.Command, args []string) error {
|
|||||||
// Detect role
|
// Detect role
|
||||||
ctx := detectRole(cwd, townRoot)
|
ctx := detectRole(cwd, townRoot)
|
||||||
|
|
||||||
|
// Check and acquire identity lock for worker roles
|
||||||
|
if err := acquireIdentityLock(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure beads redirect exists for worktree-based roles
|
// Ensure beads redirect exists for worktree-based roles
|
||||||
ensureBeadsRedirect(ctx)
|
ensureBeadsRedirect(ctx)
|
||||||
|
|
||||||
@@ -668,6 +675,57 @@ func outputDeaconPatrolContext(ctx RoleContext) {
|
|||||||
fmt.Println(" gt mol bond mol-deacon-patrol")
|
fmt.Println(" gt mol bond mol-deacon-patrol")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// acquireIdentityLock checks and acquires the identity lock for worker roles.
|
||||||
|
// This prevents multiple agents from claiming the same worker identity.
|
||||||
|
// Returns an error if another agent already owns this identity.
|
||||||
|
func acquireIdentityLock(ctx RoleContext) error {
|
||||||
|
// Only lock worker roles (polecat, crew)
|
||||||
|
// Infrastructure roles (mayor, witness, refinery, deacon) are singletons
|
||||||
|
// managed by tmux session names, so they don't need file-based locks
|
||||||
|
if ctx.Role != RolePolecat && ctx.Role != RoleCrew {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create lock for this worker directory
|
||||||
|
l := lock.New(ctx.WorkDir)
|
||||||
|
|
||||||
|
// Determine session ID from environment or context
|
||||||
|
sessionID := os.Getenv("TMUX_PANE")
|
||||||
|
if sessionID == "" {
|
||||||
|
// Fall back to a descriptive identifier
|
||||||
|
sessionID = fmt.Sprintf("%s/%s", ctx.Rig, ctx.Polecat)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to acquire the lock
|
||||||
|
if err := l.Acquire(sessionID); err != nil {
|
||||||
|
if errors.Is(err, lock.ErrLocked) {
|
||||||
|
// Another agent owns this identity
|
||||||
|
fmt.Printf("\n%s\n\n", style.Bold.Render("⚠️ IDENTITY COLLISION DETECTED"))
|
||||||
|
fmt.Printf("Another agent already claims this worker identity.\n\n")
|
||||||
|
|
||||||
|
// Show lock details
|
||||||
|
if info, readErr := l.Read(); readErr == nil {
|
||||||
|
fmt.Printf("Lock holder:\n")
|
||||||
|
fmt.Printf(" PID: %d\n", info.PID)
|
||||||
|
fmt.Printf(" Session: %s\n", info.SessionID)
|
||||||
|
fmt.Printf(" Acquired: %s\n", info.AcquiredAt.Format("2006-01-02 15:04:05"))
|
||||||
|
fmt.Println()
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("To resolve:\n")
|
||||||
|
fmt.Printf(" 1. Find the other session and close it, OR\n")
|
||||||
|
fmt.Printf(" 2. Run: gt doctor --fix (cleans stale locks)\n")
|
||||||
|
fmt.Printf(" 3. If lock is stale: rm %s/.gastown/agent.lock\n", ctx.WorkDir)
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
return fmt.Errorf("cannot claim identity %s/%s: %w", ctx.Rig, ctx.Polecat, err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("acquiring identity lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ensureBeadsRedirect ensures the .beads/redirect file exists for worktree-based roles.
|
// ensureBeadsRedirect ensures the .beads/redirect file exists for worktree-based roles.
|
||||||
// This handles cases where git clean or other operations delete the redirect file.
|
// This handles cases where git clean or other operations delete the redirect file.
|
||||||
func ensureBeadsRedirect(ctx RoleContext) {
|
func ensureBeadsRedirect(ctx RoleContext) {
|
||||||
|
|||||||
136
internal/doctor/identity_check.go
Normal file
136
internal/doctor/identity_check.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package doctor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/steveyegge/gastown/internal/lock"
|
||||||
|
"github.com/steveyegge/gastown/internal/tmux"
|
||||||
|
)
|
||||||
|
|
||||||
|
// IdentityCollisionCheck checks for agent identity collisions and stale locks.
|
||||||
|
type IdentityCollisionCheck struct{}
|
||||||
|
|
||||||
|
// NewIdentityCollisionCheck creates a new identity collision check.
|
||||||
|
func NewIdentityCollisionCheck() *IdentityCollisionCheck {
|
||||||
|
return &IdentityCollisionCheck{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *IdentityCollisionCheck) Name() string {
|
||||||
|
return "identity-collision"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *IdentityCollisionCheck) Description() string {
|
||||||
|
return "Check for agent identity collisions and stale locks"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *IdentityCollisionCheck) CanFix() bool {
|
||||||
|
return true // Can fix stale locks
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *IdentityCollisionCheck) Run(ctx *CheckContext) *CheckResult {
|
||||||
|
// Find all locks
|
||||||
|
locks, err := lock.FindAllLocks(ctx.TownRoot)
|
||||||
|
if err != nil {
|
||||||
|
return &CheckResult{
|
||||||
|
Name: c.Name(),
|
||||||
|
Status: StatusWarning,
|
||||||
|
Message: fmt.Sprintf("could not scan for locks: %v", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(locks) == 0 {
|
||||||
|
return &CheckResult{
|
||||||
|
Name: c.Name(),
|
||||||
|
Status: StatusOK,
|
||||||
|
Message: "no worker locks found",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get active tmux sessions for cross-reference
|
||||||
|
t := tmux.NewTmux()
|
||||||
|
sessions, _ := t.ListSessions() // Ignore errors - might not have tmux
|
||||||
|
|
||||||
|
sessionSet := make(map[string]bool)
|
||||||
|
for _, s := range sessions {
|
||||||
|
sessionSet[s] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
var staleLocks []string
|
||||||
|
var orphanedLocks []string
|
||||||
|
var healthyLocks int
|
||||||
|
|
||||||
|
for workerDir, info := range locks {
|
||||||
|
if info.IsStale() {
|
||||||
|
staleLocks = append(staleLocks,
|
||||||
|
fmt.Sprintf("%s (dead PID %d)", workerDir, info.PID))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if session exists
|
||||||
|
if info.SessionID != "" && !sessionSet[info.SessionID] {
|
||||||
|
// Lock has session ID but session doesn't exist
|
||||||
|
// This could be a collision or orphan
|
||||||
|
orphanedLocks = append(orphanedLocks,
|
||||||
|
fmt.Sprintf("%s (PID %d, missing session %s)", workerDir, info.PID, info.SessionID))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
healthyLocks++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build result
|
||||||
|
if len(staleLocks) == 0 && len(orphanedLocks) == 0 {
|
||||||
|
return &CheckResult{
|
||||||
|
Name: c.Name(),
|
||||||
|
Status: StatusOK,
|
||||||
|
Message: fmt.Sprintf("%d worker lock(s), all healthy", healthyLocks),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &CheckResult{
|
||||||
|
Name: c.Name(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(staleLocks) > 0 {
|
||||||
|
result.Status = StatusWarning
|
||||||
|
result.Message = fmt.Sprintf("%d stale lock(s) found", len(staleLocks))
|
||||||
|
result.Details = append(result.Details, "Stale locks (dead PIDs):")
|
||||||
|
for _, s := range staleLocks {
|
||||||
|
result.Details = append(result.Details, " "+s)
|
||||||
|
}
|
||||||
|
result.FixHint = "Run 'gt doctor --fix' or 'gt agents fix' to clean up"
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(orphanedLocks) > 0 {
|
||||||
|
if result.Status != StatusWarning {
|
||||||
|
result.Status = StatusWarning
|
||||||
|
}
|
||||||
|
if result.Message != "" {
|
||||||
|
result.Message += ", "
|
||||||
|
}
|
||||||
|
result.Message += fmt.Sprintf("%d orphaned lock(s)", len(orphanedLocks))
|
||||||
|
result.Details = append(result.Details, "Orphaned locks (missing sessions):")
|
||||||
|
for _, s := range orphanedLocks {
|
||||||
|
result.Details = append(result.Details, " "+s)
|
||||||
|
}
|
||||||
|
if !strings.Contains(result.FixHint, "doctor") {
|
||||||
|
result.FixHint = "Run 'gt doctor --fix' to clean up stale locks"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *IdentityCollisionCheck) Fix(ctx *CheckContext) error {
|
||||||
|
cleaned, err := lock.CleanStaleLocks(ctx.TownRoot)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cleaning stale locks: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cleaned > 0 {
|
||||||
|
fmt.Printf(" Cleaned %d stale lock(s)\n", cleaned)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
298
internal/lock/lock.go
Normal file
298
internal/lock/lock.go
Normal file
@@ -0,0 +1,298 @@
|
|||||||
|
// Package lock provides agent identity locking to prevent multiple agents
|
||||||
|
// from claiming the same worker identity.
|
||||||
|
//
|
||||||
|
// Lock files are stored at <worker>/.gastown/agent.lock and contain:
|
||||||
|
// - PID of the owning process
|
||||||
|
// - Timestamp when lock was acquired
|
||||||
|
// - Session ID (tmux session name)
|
||||||
|
//
|
||||||
|
// Stale locks (where the PID is dead) are automatically cleaned up.
|
||||||
|
package lock
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Common errors
|
||||||
|
var (
|
||||||
|
ErrLocked = errors.New("worker is locked by another agent")
|
||||||
|
ErrNotLocked = errors.New("worker is not locked")
|
||||||
|
ErrStaleLock = errors.New("stale lock detected")
|
||||||
|
ErrInvalidLock = errors.New("invalid lock file")
|
||||||
|
)
|
||||||
|
|
||||||
|
// LockInfo contains information about who holds a lock.
|
||||||
|
type LockInfo struct {
|
||||||
|
PID int `json:"pid"`
|
||||||
|
AcquiredAt time.Time `json:"acquired_at"`
|
||||||
|
SessionID string `json:"session_id,omitempty"`
|
||||||
|
Hostname string `json:"hostname,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsStale checks if the lock is stale (owning process is dead).
|
||||||
|
func (l *LockInfo) IsStale() bool {
|
||||||
|
return !processExists(l.PID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock represents an agent identity lock for a worker directory.
|
||||||
|
type Lock struct {
|
||||||
|
workerDir string
|
||||||
|
lockPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a Lock for the given worker directory.
|
||||||
|
func New(workerDir string) *Lock {
|
||||||
|
return &Lock{
|
||||||
|
workerDir: workerDir,
|
||||||
|
lockPath: filepath.Join(workerDir, ".gastown", "agent.lock"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Acquire attempts to acquire the lock for this worker.
|
||||||
|
// Returns ErrLocked if another live process holds the lock.
|
||||||
|
// Automatically cleans up stale locks.
|
||||||
|
func (l *Lock) Acquire(sessionID string) error {
|
||||||
|
// Check for existing lock
|
||||||
|
info, err := l.Read()
|
||||||
|
if err == nil {
|
||||||
|
// Lock exists - check if stale
|
||||||
|
if info.IsStale() {
|
||||||
|
// Stale lock - remove it
|
||||||
|
if err := l.Release(); err != nil {
|
||||||
|
return fmt.Errorf("removing stale lock: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Active lock - check if it's us
|
||||||
|
if info.PID == os.Getpid() {
|
||||||
|
// We already hold it - refresh
|
||||||
|
return l.write(sessionID)
|
||||||
|
}
|
||||||
|
// Another process holds it
|
||||||
|
return fmt.Errorf("%w: PID %d (session: %s, acquired: %s)",
|
||||||
|
ErrLocked, info.PID, info.SessionID, info.AcquiredAt.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No lock or stale lock removed - acquire it
|
||||||
|
return l.write(sessionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release releases the lock if we hold it.
|
||||||
|
func (l *Lock) Release() error {
|
||||||
|
if err := os.Remove(l.lockPath); err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("removing lock file: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads the current lock info without modifying it.
|
||||||
|
func (l *Lock) Read() (*LockInfo, error) {
|
||||||
|
data, err := os.ReadFile(l.lockPath)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil, ErrNotLocked
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("reading lock file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var info LockInfo
|
||||||
|
if err := json.Unmarshal(data, &info); err != nil {
|
||||||
|
return nil, fmt.Errorf("%w: %v", ErrInvalidLock, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check checks if the worker is locked by another agent.
|
||||||
|
// Returns nil if unlocked or locked by us.
|
||||||
|
// Returns ErrLocked if locked by another live process.
|
||||||
|
// Automatically cleans up stale locks.
|
||||||
|
func (l *Lock) Check() error {
|
||||||
|
info, err := l.Read()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNotLocked) {
|
||||||
|
return nil // Not locked
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if stale
|
||||||
|
if info.IsStale() {
|
||||||
|
// Clean up stale lock
|
||||||
|
_ = l.Release()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if it's us
|
||||||
|
if info.PID == os.Getpid() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Locked by another process
|
||||||
|
return fmt.Errorf("%w: PID %d (session: %s)", ErrLocked, info.PID, info.SessionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status returns a human-readable status of the lock.
|
||||||
|
func (l *Lock) Status() string {
|
||||||
|
info, err := l.Read()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, ErrNotLocked) {
|
||||||
|
return "unlocked"
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.IsStale() {
|
||||||
|
return fmt.Sprintf("stale (dead PID %d)", info.PID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.PID == os.Getpid() {
|
||||||
|
return "locked (by us)"
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("locked by PID %d (session: %s)", info.PID, info.SessionID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ForceRelease removes the lock regardless of who holds it.
|
||||||
|
// Use with caution - only for doctor --fix scenarios.
|
||||||
|
func (l *Lock) ForceRelease() error {
|
||||||
|
return l.Release()
|
||||||
|
}
|
||||||
|
|
||||||
|
// write creates or updates the lock file.
|
||||||
|
func (l *Lock) write(sessionID string) error {
|
||||||
|
// Ensure .gastown directory exists
|
||||||
|
dir := filepath.Dir(l.lockPath)
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("creating lock directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
hostname, _ := os.Hostname()
|
||||||
|
info := LockInfo{
|
||||||
|
PID: os.Getpid(),
|
||||||
|
AcquiredAt: time.Now(),
|
||||||
|
SessionID: sessionID,
|
||||||
|
Hostname: hostname,
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.MarshalIndent(info, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshaling lock info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.WriteFile(l.lockPath, data, 0644); err != nil {
|
||||||
|
return fmt.Errorf("writing lock file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processExists checks if a process with the given PID exists and is alive.
|
||||||
|
func processExists(pid int) bool {
|
||||||
|
if pid <= 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// On Unix, sending signal 0 checks if process exists without affecting it
|
||||||
|
process, err := os.FindProcess(pid)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to send signal 0 - this will fail if process doesn't exist
|
||||||
|
err = process.Signal(syscall.Signal(0))
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAllLocks scans a directory tree for agent.lock files.
|
||||||
|
// Returns a map of worker directory -> LockInfo.
|
||||||
|
func FindAllLocks(root string) (map[string]*LockInfo, error) {
|
||||||
|
locks := make(map[string]*LockInfo)
|
||||||
|
|
||||||
|
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return nil // Skip errors
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if filepath.Base(path) == "agent.lock" && filepath.Base(filepath.Dir(path)) == ".gastown" {
|
||||||
|
workerDir := filepath.Dir(filepath.Dir(path))
|
||||||
|
lock := New(workerDir)
|
||||||
|
lockInfo, err := lock.Read()
|
||||||
|
if err == nil {
|
||||||
|
locks[workerDir] = lockInfo
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return locks, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// CleanStaleLocks removes all stale locks in a directory tree.
|
||||||
|
// Returns the number of stale locks cleaned.
|
||||||
|
func CleanStaleLocks(root string) (int, error) {
|
||||||
|
locks, err := FindAllLocks(root)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cleaned := 0
|
||||||
|
for workerDir, info := range locks {
|
||||||
|
if info.IsStale() {
|
||||||
|
lock := New(workerDir)
|
||||||
|
if err := lock.Release(); err == nil {
|
||||||
|
cleaned++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return cleaned, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DetectCollisions finds workers with multiple agents claiming the same identity.
|
||||||
|
// This detects the case where multiple processes think they own the same worker
|
||||||
|
// by comparing tmux sessions with lock files.
|
||||||
|
// Returns a list of collision descriptions.
|
||||||
|
func DetectCollisions(root string, activeSessions []string) []string {
|
||||||
|
var collisions []string
|
||||||
|
|
||||||
|
locks, err := FindAllLocks(root)
|
||||||
|
if err != nil {
|
||||||
|
return collisions
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build set of active sessions
|
||||||
|
activeSet := make(map[string]bool)
|
||||||
|
for _, s := range activeSessions {
|
||||||
|
activeSet[s] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for workerDir, info := range locks {
|
||||||
|
if info.IsStale() {
|
||||||
|
collisions = append(collisions,
|
||||||
|
fmt.Sprintf("stale lock in %s (dead PID %d, session: %s)",
|
||||||
|
workerDir, info.PID, info.SessionID))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the session in the lock matches an active session
|
||||||
|
if info.SessionID != "" && !activeSet[info.SessionID] {
|
||||||
|
collisions = append(collisions,
|
||||||
|
fmt.Sprintf("orphaned lock in %s (session %s not found, PID %d still alive)",
|
||||||
|
workerDir, info.SessionID, info.PID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return collisions
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user