Merge fix/convoy-last-activity into feature/convoy-dashboard

Merges PR #85 (fix/convoy-last-activity) into PR #71 (feature/convoy-dashboard).
Resolved conflict in fetcher.go by taking the simplified tmux-based activity tracking.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Mike Lady
2026-01-03 17:39:22 -08:00
6 changed files with 637 additions and 68 deletions

View File

@@ -30,6 +30,7 @@ func NewLiveConvoyFetcher() (*LiveConvoyFetcher, error) {
}, nil
}
// FetchConvoys fetches all open convoys with their activity data.
func (f *LiveConvoyFetcher) FetchConvoys() ([]ConvoyRow, error) {
// List all open convoy-type issues
@@ -68,6 +69,8 @@ func (f *LiveConvoyFetcher) FetchConvoys() ([]ConvoyRow, error) {
row.Total = len(tracked)
var mostRecentActivity time.Time
var mostRecentUpdated time.Time
var hasAssignee bool
for _, t := range tracked {
if t.Status == "closed" {
row.Completed++
@@ -76,16 +79,43 @@ func (f *LiveConvoyFetcher) FetchConvoys() ([]ConvoyRow, error) {
if t.LastActivity.After(mostRecentActivity) {
mostRecentActivity = t.LastActivity
}
// Track most recent updated_at as fallback
if t.UpdatedAt.After(mostRecentUpdated) {
mostRecentUpdated = t.UpdatedAt
}
if t.Assignee != "" {
hasAssignee = true
}
}
row.Progress = fmt.Sprintf("%d/%d", row.Completed, row.Total)
// Calculate activity info from most recent worker activity
if !mostRecentActivity.IsZero() {
// Have active tmux session activity from assigned workers
row.LastActivity = activity.Calculate(mostRecentActivity)
} else if !hasAssignee {
// No assignees found in beads - try fallback to any running polecat activity
// This handles cases where bd update --assignee didn't persist or wasn't returned
if polecatActivity := f.getAllPolecatActivity(); polecatActivity != nil {
info := activity.Calculate(*polecatActivity)
info.FormattedAge = info.FormattedAge + " (polecat active)"
row.LastActivity = info
} else if !mostRecentUpdated.IsZero() {
// Fall back to issue updated_at if no polecats running
info := activity.Calculate(mostRecentUpdated)
info.FormattedAge = info.FormattedAge + " (unassigned)"
row.LastActivity = info
} else {
row.LastActivity = activity.Info{
FormattedAge: "unassigned",
ColorClass: activity.ColorUnknown,
}
}
} else {
// Has assignee but no active session
row.LastActivity = activity.Info{
FormattedAge: "no activity",
FormattedAge: "idle",
ColorClass: activity.ColorUnknown,
}
}
@@ -114,6 +144,7 @@ type trackedIssueInfo struct {
Status string
Assignee string
LastActivity time.Time
UpdatedAt time.Time // Fallback for activity when no assignee
}
// getTrackedIssues fetches tracked issues for a convoy.
@@ -156,8 +187,8 @@ func (f *LiveConvoyFetcher) getTrackedIssues(convoyID string) []trackedIssueInfo
// Batch fetch issue details
details := f.getIssueDetailsBatch(issueIDs)
// Get worker info for activity timestamps
workers := f.getWorkersForIssues(issueIDs)
// Get worker activity from tmux sessions based on assignees
workers := f.getWorkersFromAssignees(details)
// Build result
result := make([]trackedIssueInfo, 0, len(issueIDs))
@@ -168,6 +199,7 @@ func (f *LiveConvoyFetcher) getTrackedIssues(convoyID string) []trackedIssueInfo
info.Title = d.Title
info.Status = d.Status
info.Assignee = d.Assignee
info.UpdatedAt = d.UpdatedAt
} else {
info.Title = "(external)"
info.Status = "unknown"
@@ -185,10 +217,11 @@ func (f *LiveConvoyFetcher) getTrackedIssues(convoyID string) []trackedIssueInfo
// issueDetail holds basic issue info.
type issueDetail struct {
ID string
Title string
Status string
Assignee string
ID string
Title string
Status string
Assignee string
UpdatedAt time.Time
}
// getIssueDetailsBatch fetches details for multiple issues.
@@ -211,22 +244,30 @@ func (f *LiveConvoyFetcher) getIssueDetailsBatch(issueIDs []string) map[string]*
}
var issues []struct {
ID string `json:"id"`
Title string `json:"title"`
Status string `json:"status"`
Assignee string `json:"assignee"`
ID string `json:"id"`
Title string `json:"title"`
Status string `json:"status"`
Assignee string `json:"assignee"`
UpdatedAt string `json:"updated_at"`
}
if err := json.Unmarshal(stdout.Bytes(), &issues); err != nil {
return result
}
for _, issue := range issues {
result[issue.ID] = &issueDetail{
detail := &issueDetail{
ID: issue.ID,
Title: issue.Title,
Status: issue.Status,
Assignee: issue.Assignee,
}
// Parse updated_at timestamp
if issue.UpdatedAt != "" {
if t, err := time.Parse(time.RFC3339, issue.UpdatedAt); err == nil {
detail.UpdatedAt = t
}
}
result[issue.ID] = detail
}
return result
@@ -238,63 +279,381 @@ type workerDetail struct {
LastActivity *time.Time
}
// getWorkersForIssues finds workers and their last activity for issues.
func (f *LiveConvoyFetcher) getWorkersForIssues(issueIDs []string) map[string]*workerDetail {
// getWorkersFromAssignees gets worker activity from tmux sessions based on issue assignees.
// Assignees are in format "rigname/polecats/polecatname" which maps to tmux session "gt-rigname-polecatname".
func (f *LiveConvoyFetcher) getWorkersFromAssignees(details map[string]*issueDetail) map[string]*workerDetail {
result := make(map[string]*workerDetail)
if len(issueIDs) == 0 {
// Collect unique assignees and map them to issue IDs
assigneeToIssues := make(map[string][]string)
for issueID, detail := range details {
if detail == nil || detail.Assignee == "" {
continue
}
assigneeToIssues[detail.Assignee] = append(assigneeToIssues[detail.Assignee], issueID)
}
if len(assigneeToIssues) == 0 {
return result
}
townRoot, _ := workspace.FindFromCwd()
if townRoot == "" {
return result
}
// For each unique assignee, look up tmux session activity
for assignee, issueIDs := range assigneeToIssues {
activity := f.getSessionActivityForAssignee(assignee)
if activity == nil {
continue
}
// Find all rig beads databases
rigDirs, _ := filepath.Glob(filepath.Join(townRoot, "*", "mayor", "rig", ".beads", "beads.db"))
for _, dbPath := range rigDirs {
// Apply this activity to all issues assigned to this worker
for _, issueID := range issueIDs {
if _, ok := result[issueID]; ok {
continue
result[issueID] = &workerDetail{
Worker: assignee,
LastActivity: activity,
}
safeID := strings.ReplaceAll(issueID, "'", "''")
query := fmt.Sprintf(
`SELECT id, hook_bead, last_activity FROM issues WHERE issue_type = 'agent' AND status = 'open' AND hook_bead = '%s' LIMIT 1`,
safeID)
// #nosec G204 -- sqlite3 path is from trusted glob, issueID is escaped
queryCmd := exec.Command("sqlite3", "-json", dbPath, query)
var stdout bytes.Buffer
queryCmd.Stdout = &stdout
if err := queryCmd.Run(); err != nil {
continue
}
var agents []struct {
ID string `json:"id"`
HookBead string `json:"hook_bead"`
LastActivity string `json:"last_activity"`
}
if err := json.Unmarshal(stdout.Bytes(), &agents); err != nil || len(agents) == 0 {
continue
}
agent := agents[0]
detail := &workerDetail{
Worker: agent.ID,
}
if agent.LastActivity != "" {
if t, err := time.Parse(time.RFC3339, agent.LastActivity); err == nil {
detail.LastActivity = &t
}
}
result[issueID] = detail
}
}
return result
}
// getSessionActivityForAssignee looks up tmux session activity for an assignee.
// Assignee format: "rigname/polecats/polecatname" -> session "gt-rigname-polecatname"
func (f *LiveConvoyFetcher) getSessionActivityForAssignee(assignee string) *time.Time {
// Parse assignee: "roxas/polecats/dag" -> rig="roxas", polecat="dag"
parts := strings.Split(assignee, "/")
if len(parts) != 3 || parts[1] != "polecats" {
return nil
}
rig := parts[0]
polecat := parts[2]
// Construct session name
sessionName := fmt.Sprintf("gt-%s-%s", rig, polecat)
// Query tmux for session activity
// Format: session_activity returns unix timestamp
cmd := exec.Command("tmux", "list-sessions", "-F", "#{session_name}|#{session_activity}",
"-f", fmt.Sprintf("#{==:#{session_name},%s}", sessionName))
var stdout bytes.Buffer
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return nil
}
output := strings.TrimSpace(stdout.String())
if output == "" {
return nil
}
// Parse output: "gt-roxas-dag|1704312345"
outputParts := strings.Split(output, "|")
if len(outputParts) < 2 {
return nil
}
var activityUnix int64
if _, err := fmt.Sscanf(outputParts[1], "%d", &activityUnix); err != nil || activityUnix == 0 {
return nil
}
activity := time.Unix(activityUnix, 0)
return &activity
}
// getAllPolecatActivity returns the most recent activity from any running polecat session.
// This is used as a fallback when no specific assignee activity can be determined.
// Returns nil if no polecat sessions are running.
func (f *LiveConvoyFetcher) getAllPolecatActivity() *time.Time {
// List all tmux sessions matching gt-*-* pattern (polecat sessions)
// Format: gt-{rig}-{polecat}
cmd := exec.Command("tmux", "list-sessions", "-F", "#{session_name}|#{session_activity}")
var stdout bytes.Buffer
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return nil
}
var mostRecent time.Time
for _, line := range strings.Split(stdout.String(), "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
parts := strings.Split(line, "|")
if len(parts) < 2 {
continue
}
sessionName := parts[0]
// Check if it's a polecat session (gt-{rig}-{polecat}, not gt-{rig}-witness/refinery)
// Polecat sessions have exactly 3 parts when split by "-" and the middle part is the rig
nameParts := strings.Split(sessionName, "-")
if len(nameParts) < 3 || nameParts[0] != "gt" {
continue
}
// Skip witness, refinery, mayor, deacon sessions
lastPart := nameParts[len(nameParts)-1]
if lastPart == "witness" || lastPart == "refinery" || lastPart == "mayor" || lastPart == "deacon" {
continue
}
var activityUnix int64
if _, err := fmt.Sscanf(parts[1], "%d", &activityUnix); err != nil || activityUnix == 0 {
continue
}
activityTime := time.Unix(activityUnix, 0)
if activityTime.After(mostRecent) {
mostRecent = activityTime
}
}
if mostRecent.IsZero() {
return nil
}
return &mostRecent
}
// FetchMergeQueue fetches open PRs from configured repos.
func (f *LiveConvoyFetcher) FetchMergeQueue() ([]MergeQueueRow, error) {
// Repos to query for PRs
repos := []struct {
Full string // Full repo path for gh CLI
Short string // Short name for display
}{
{"michaellady/roxas", "roxas"},
{"michaellady/gastown", "gastown"},
}
var result []MergeQueueRow
for _, repo := range repos {
prs, err := f.fetchPRsForRepo(repo.Full, repo.Short)
if err != nil {
// Non-fatal: continue with other repos
continue
}
result = append(result, prs...)
}
return result, nil
}
// prResponse represents the JSON response from gh pr list.
type prResponse struct {
Number int `json:"number"`
Title string `json:"title"`
URL string `json:"url"`
Mergeable string `json:"mergeable"`
StatusCheckRollup []struct {
State string `json:"state"`
Status string `json:"status"`
Conclusion string `json:"conclusion"`
} `json:"statusCheckRollup"`
}
// fetchPRsForRepo fetches open PRs for a single repo.
func (f *LiveConvoyFetcher) fetchPRsForRepo(repoFull, repoShort string) ([]MergeQueueRow, error) {
// #nosec G204 -- gh is a trusted CLI, repo is from hardcoded list
cmd := exec.Command("gh", "pr", "list",
"--repo", repoFull,
"--state", "open",
"--json", "number,title,url,mergeable,statusCheckRollup")
var stdout bytes.Buffer
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("fetching PRs for %s: %w", repoFull, err)
}
var prs []prResponse
if err := json.Unmarshal(stdout.Bytes(), &prs); err != nil {
return nil, fmt.Errorf("parsing PRs for %s: %w", repoFull, err)
}
result := make([]MergeQueueRow, 0, len(prs))
for _, pr := range prs {
row := MergeQueueRow{
Number: pr.Number,
Repo: repoShort,
Title: pr.Title,
URL: pr.URL,
}
// Determine CI status from statusCheckRollup
row.CIStatus = determineCIStatus(pr.StatusCheckRollup)
// Determine mergeable status
row.Mergeable = determineMergeableStatus(pr.Mergeable)
// Determine color class based on overall status
row.ColorClass = determineColorClass(row.CIStatus, row.Mergeable)
result = append(result, row)
}
return result, nil
}
// determineCIStatus evaluates the overall CI status from status checks.
func determineCIStatus(checks []struct {
State string `json:"state"`
Status string `json:"status"`
Conclusion string `json:"conclusion"`
}) string {
if len(checks) == 0 {
return "pending"
}
hasFailure := false
hasPending := false
for _, check := range checks {
// Check conclusion first (for completed checks)
switch check.Conclusion {
case "failure", "cancelled", "timed_out", "action_required":
hasFailure = true
case "success", "skipped", "neutral":
// Pass
default:
// Check status for in-progress checks
switch check.Status {
case "queued", "in_progress", "waiting", "pending", "requested":
hasPending = true
}
// Also check state field
switch check.State {
case "FAILURE", "ERROR":
hasFailure = true
case "PENDING", "EXPECTED":
hasPending = true
}
}
}
if hasFailure {
return "fail"
}
if hasPending {
return "pending"
}
return "pass"
}
// determineMergeableStatus converts GitHub's mergeable field to display value.
func determineMergeableStatus(mergeable string) string {
switch strings.ToUpper(mergeable) {
case "MERGEABLE":
return "ready"
case "CONFLICTING":
return "conflict"
default:
return "pending"
}
}
// determineColorClass determines the row color based on CI and merge status.
func determineColorClass(ciStatus, mergeable string) string {
if ciStatus == "fail" || mergeable == "conflict" {
return "mq-red"
}
if ciStatus == "pending" || mergeable == "pending" {
return "mq-yellow"
}
if ciStatus == "pass" && mergeable == "ready" {
return "mq-green"
}
return "mq-yellow"
}
// FetchPolecats fetches all running polecat sessions with activity data.
func (f *LiveConvoyFetcher) FetchPolecats() ([]PolecatRow, error) {
// Query all tmux sessions
cmd := exec.Command("tmux", "list-sessions", "-F", "#{session_name}|#{session_activity}")
var stdout bytes.Buffer
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
// tmux not running or no sessions
return nil, nil
}
var polecats []PolecatRow
lines := strings.Split(strings.TrimSpace(stdout.String()), "\n")
for _, line := range lines {
if line == "" {
continue
}
parts := strings.Split(line, "|")
if len(parts) < 2 {
continue
}
sessionName := parts[0]
// Filter for gt-<rig>-<polecat> pattern
if !strings.HasPrefix(sessionName, "gt-") {
continue
}
// Parse session name: gt-roxas-dag -> rig=roxas, polecat=dag
nameParts := strings.SplitN(sessionName, "-", 3)
if len(nameParts) != 3 {
continue
}
rig := nameParts[1]
polecat := nameParts[2]
// Skip non-polecat sessions (refinery, witness, mayor, deacon, boot)
if polecat == "refinery" || polecat == "witness" || polecat == "mayor" || polecat == "deacon" || polecat == "boot" {
continue
}
// Parse activity timestamp
var activityUnix int64
if _, err := fmt.Sscanf(parts[1], "%d", &activityUnix); err != nil || activityUnix == 0 {
continue
}
activityTime := time.Unix(activityUnix, 0)
// Get status hint from last line of pane
statusHint := f.getPolecatStatusHint(sessionName)
polecats = append(polecats, PolecatRow{
Name: polecat,
Rig: rig,
SessionID: sessionName,
LastActivity: activity.Calculate(activityTime),
StatusHint: statusHint,
})
}
return polecats, nil
}
// getPolecatStatusHint captures the last non-empty line from a polecat's pane.
func (f *LiveConvoyFetcher) getPolecatStatusHint(sessionName string) string {
cmd := exec.Command("tmux", "capture-pane", "-t", sessionName, "-p", "-J")
var stdout bytes.Buffer
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return ""
}
// Get last non-empty line
lines := strings.Split(stdout.String(), "\n")
for i := len(lines) - 1; i >= 0; i-- {
line := strings.TrimSpace(lines[i])
if line != "" {
// Truncate long lines
if len(line) > 60 {
line = line[:57] + "..."
}
return line
}
}
return ""
}