fix: improve --town mode robustness from code review
- resolveBeadsRedirect now verifies target exists before returning - Added failure tracking to runTownActivityFollow (warns on rig disconnect) - Created fetchTownMutationsWithStatus for tracking daemon availability - Shows reconnection message when rigs come back online 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -495,7 +495,8 @@ func discoverRigDaemons() []rigDaemon {
|
|||||||
return daemons
|
return daemons
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolveBeadsRedirect follows a redirect file if present
|
// resolveBeadsRedirect follows a redirect file if present.
|
||||||
|
// Similar to routing.resolveRedirect but simplified for activity use.
|
||||||
func resolveBeadsRedirect(beadsDir string) string {
|
func resolveBeadsRedirect(beadsDir string) string {
|
||||||
redirectFile := filepath.Join(beadsDir, "redirect")
|
redirectFile := filepath.Join(beadsDir, "redirect")
|
||||||
data, err := os.ReadFile(redirectFile)
|
data, err := os.ReadFile(redirectFile)
|
||||||
@@ -513,12 +514,26 @@ func resolveBeadsRedirect(beadsDir string) string {
|
|||||||
redirectPath = filepath.Join(beadsDir, redirectPath)
|
redirectPath = filepath.Join(beadsDir, redirectPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
return filepath.Clean(redirectPath)
|
redirectPath = filepath.Clean(redirectPath)
|
||||||
|
|
||||||
|
// Verify target exists before returning
|
||||||
|
if info, err := os.Stat(redirectPath); err == nil && info.IsDir() {
|
||||||
|
return redirectPath
|
||||||
|
}
|
||||||
|
|
||||||
|
return beadsDir // Fallback to original if redirect target invalid
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchTownMutations retrieves mutations from all rig daemons
|
// fetchTownMutations retrieves mutations from all rig daemons
|
||||||
func fetchTownMutations(daemons []rigDaemon, since time.Time) []rpc.MutationEvent {
|
func fetchTownMutations(daemons []rigDaemon, since time.Time) []rpc.MutationEvent {
|
||||||
|
events, _ := fetchTownMutationsWithStatus(daemons, since)
|
||||||
|
return events
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchTownMutationsWithStatus retrieves mutations and returns count of responding daemons
|
||||||
|
func fetchTownMutationsWithStatus(daemons []rigDaemon, since time.Time) ([]rpc.MutationEvent, int) {
|
||||||
var allEvents []rpc.MutationEvent
|
var allEvents []rpc.MutationEvent
|
||||||
|
activeCount := 0
|
||||||
|
|
||||||
var sinceMillis int64
|
var sinceMillis int64
|
||||||
if !since.IsZero() {
|
if !since.IsZero() {
|
||||||
@@ -535,6 +550,8 @@ func fetchTownMutations(daemons []rigDaemon, since time.Time) []rpc.MutationEven
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
activeCount++
|
||||||
|
|
||||||
var mutations []rpc.MutationEvent
|
var mutations []rpc.MutationEvent
|
||||||
if err := json.Unmarshal(resp.Data, &mutations); err != nil {
|
if err := json.Unmarshal(resp.Data, &mutations); err != nil {
|
||||||
continue
|
continue
|
||||||
@@ -548,7 +565,7 @@ func fetchTownMutations(daemons []rigDaemon, since time.Time) []rpc.MutationEven
|
|||||||
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
|
return allEvents[i].Timestamp.Before(allEvents[j].Timestamp)
|
||||||
})
|
})
|
||||||
|
|
||||||
return allEvents
|
return allEvents, activeCount
|
||||||
}
|
}
|
||||||
|
|
||||||
// runTownActivityOnce fetches and displays events from all rigs once
|
// runTownActivityOnce fetches and displays events from all rigs once
|
||||||
@@ -649,12 +666,43 @@ func runTownActivityFollow(sinceTime time.Time) {
|
|||||||
ticker := time.NewTicker(activityInterval)
|
ticker := time.NewTicker(activityInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Track failures for warning messages
|
||||||
|
consecutiveFailures := 0
|
||||||
|
const failureWarningThreshold = 5
|
||||||
|
lastWarningTime := time.Time{}
|
||||||
|
lastActiveCount := activeCount
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-rootCtx.Done():
|
case <-rootCtx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
newEvents := fetchTownMutations(daemons, lastPoll)
|
newEvents, currentActive := fetchTownMutationsWithStatus(daemons, lastPoll)
|
||||||
|
|
||||||
|
// Track daemon availability changes
|
||||||
|
if currentActive < lastActiveCount {
|
||||||
|
consecutiveFailures++
|
||||||
|
if consecutiveFailures >= failureWarningThreshold {
|
||||||
|
if time.Since(lastWarningTime) >= 30*time.Second {
|
||||||
|
if !jsonOutput {
|
||||||
|
timestamp := time.Now().Format("15:04:05")
|
||||||
|
fmt.Fprintf(os.Stderr, "[%s] %s some rigs unreachable (%d/%d active)\n",
|
||||||
|
timestamp, ui.RenderWarn("!"), currentActive, len(daemons))
|
||||||
|
}
|
||||||
|
lastWarningTime = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if currentActive > lastActiveCount {
|
||||||
|
// Daemon came back
|
||||||
|
if !jsonOutput {
|
||||||
|
timestamp := time.Now().Format("15:04:05")
|
||||||
|
fmt.Fprintf(os.Stderr, "[%s] %s rig reconnected (%d/%d active)\n",
|
||||||
|
timestamp, ui.RenderPass("✓"), currentActive, len(daemons))
|
||||||
|
}
|
||||||
|
consecutiveFailures = 0
|
||||||
|
}
|
||||||
|
lastActiveCount = currentActive
|
||||||
|
|
||||||
newEvents = filterEvents(newEvents)
|
newEvents = filterEvents(newEvents)
|
||||||
|
|
||||||
for _, e := range newEvents {
|
for _, e := range newEvents {
|
||||||
|
|||||||
Reference in New Issue
Block a user