diff --git a/internal/cmd/feed.go b/internal/cmd/feed.go index 74857d14..a55a0929 100644 --- a/internal/cmd/feed.go +++ b/internal/cmd/feed.go @@ -65,6 +65,12 @@ Event symbols: ✗ failed - Step or issue failed ⊘ deleted - Issue removed +MQ (Merge Queue) event symbols: + ⚙ merge_started - Refinery began processing an MR + ✓ merged - MR successfully merged (green) + ✗ merge_failed - Merge failed (conflict, tests, etc.) (red) + ⊘ merge_skipped - MR skipped (already merged, etc.) + Examples: gt feed # Launch TUI dashboard gt feed --plain # Plain text output (bd activity) @@ -181,16 +187,28 @@ func runFeedDirect(workDir string, bdArgs []string) error { // runFeedTUI runs the interactive TUI feed. func runFeedTUI(workDir string) error { + var sources []feed.EventSource + // Create event source from bd activity - source, err := feed.NewBdActivitySource(workDir) + bdSource, err := feed.NewBdActivitySource(workDir) if err != nil { - return fmt.Errorf("creating event source: %w", err) + return fmt.Errorf("creating bd activity source: %w", err) } - defer source.Close() + sources = append(sources, bdSource) + + // Create MQ event source (optional - don't fail if not available) + mqSource, err := feed.NewMQEventSourceFromWorkDir(workDir) + if err == nil { + sources = append(sources, mqSource) + } + + // Combine all sources + multiSource := feed.NewMultiSource(sources...) + defer multiSource.Close() // Create model and connect event source m := feed.NewModel() - m.SetEventChannel(source.Events()) + m.SetEventChannel(multiSource.Events()) // Run the TUI p := tea.NewProgram(m, tea.WithAltScreen()) diff --git a/internal/mrqueue/events.go b/internal/mrqueue/events.go new file mode 100644 index 00000000..92660d45 --- /dev/null +++ b/internal/mrqueue/events.go @@ -0,0 +1,152 @@ +// Package mrqueue provides merge request queue storage and events. +package mrqueue + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +// EventType represents the type of MQ lifecycle event. +type EventType string + +const ( + // EventMergeStarted indicates refinery began processing an MR. + EventMergeStarted EventType = "merge_started" + // EventMerged indicates an MR was successfully merged. + EventMerged EventType = "merged" + // EventMergeFailed indicates a merge failed (conflict, tests, etc.). + EventMergeFailed EventType = "merge_failed" + // EventMergeSkipped indicates an MR was skipped (already merged, etc.). + EventMergeSkipped EventType = "merge_skipped" +) + +// Event represents a single MQ lifecycle event. +type Event struct { + Timestamp time.Time `json:"timestamp"` + Type EventType `json:"type"` + MRID string `json:"mr_id"` + Branch string `json:"branch"` + Target string `json:"target"` + Worker string `json:"worker,omitempty"` + SourceIssue string `json:"source_issue,omitempty"` + Rig string `json:"rig,omitempty"` + MergeCommit string `json:"merge_commit,omitempty"` // For merged events + Reason string `json:"reason,omitempty"` // For failed/skipped events +} + +// EventLogger handles writing MQ events to the event log. +type EventLogger struct { + logPath string + mu sync.Mutex +} + +// NewEventLogger creates a new EventLogger for the given beads directory. +func NewEventLogger(beadsDir string) *EventLogger { + return &EventLogger{ + logPath: filepath.Join(beadsDir, "mq_events.jsonl"), + } +} + +// NewEventLoggerFromRig creates an EventLogger for the given rig path. +func NewEventLoggerFromRig(rigPath string) *EventLogger { + return NewEventLogger(filepath.Join(rigPath, ".beads")) +} + +// LogEvent writes an event to the MQ event log. +func (l *EventLogger) LogEvent(event Event) error { + l.mu.Lock() + defer l.mu.Unlock() + + // Ensure timestamp is set + if event.Timestamp.IsZero() { + event.Timestamp = time.Now() + } + + // Ensure log directory exists + if err := os.MkdirAll(filepath.Dir(l.logPath), 0755); err != nil { + return fmt.Errorf("creating log directory: %w", err) + } + + // Marshal event to JSON + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("marshaling event: %w", err) + } + + // Append to log file + f, err := os.OpenFile(l.logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("opening event log: %w", err) + } + defer f.Close() + + if _, err := f.Write(append(data, '\n')); err != nil { + return fmt.Errorf("writing event: %w", err) + } + + return nil +} + +// LogMergeStarted logs a merge_started event. +func (l *EventLogger) LogMergeStarted(mr *MR) error { + return l.LogEvent(Event{ + Type: EventMergeStarted, + MRID: mr.ID, + Branch: mr.Branch, + Target: mr.Target, + Worker: mr.Worker, + SourceIssue: mr.SourceIssue, + Rig: mr.Rig, + }) +} + +// LogMerged logs a merged event. +func (l *EventLogger) LogMerged(mr *MR, mergeCommit string) error { + return l.LogEvent(Event{ + Type: EventMerged, + MRID: mr.ID, + Branch: mr.Branch, + Target: mr.Target, + Worker: mr.Worker, + SourceIssue: mr.SourceIssue, + Rig: mr.Rig, + MergeCommit: mergeCommit, + }) +} + +// LogMergeFailed logs a merge_failed event. +func (l *EventLogger) LogMergeFailed(mr *MR, reason string) error { + return l.LogEvent(Event{ + Type: EventMergeFailed, + MRID: mr.ID, + Branch: mr.Branch, + Target: mr.Target, + Worker: mr.Worker, + SourceIssue: mr.SourceIssue, + Rig: mr.Rig, + Reason: reason, + }) +} + +// LogMergeSkipped logs a merge_skipped event. +func (l *EventLogger) LogMergeSkipped(mr *MR, reason string) error { + return l.LogEvent(Event{ + Type: EventMergeSkipped, + MRID: mr.ID, + Branch: mr.Branch, + Target: mr.Target, + Worker: mr.Worker, + SourceIssue: mr.SourceIssue, + Rig: mr.Rig, + Reason: reason, + }) +} + +// LogPath returns the path to the event log file. +func (l *EventLogger) LogPath() string { + return l.logPath +} diff --git a/internal/mrqueue/events_test.go b/internal/mrqueue/events_test.go new file mode 100644 index 00000000..cc5ed668 --- /dev/null +++ b/internal/mrqueue/events_test.go @@ -0,0 +1,114 @@ +package mrqueue + +import ( + "encoding/json" + "os" + "path/filepath" + "testing" + "time" +) + +func TestEventLogger(t *testing.T) { + // Create temp directory + tmpDir, err := os.MkdirTemp("", "mrqueue-test") + if err != nil { + t.Fatalf("Failed to create temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + + beadsDir := filepath.Join(tmpDir, ".beads") + if err := os.MkdirAll(beadsDir, 0755); err != nil { + t.Fatalf("Failed to create beads dir: %v", err) + } + + logger := NewEventLogger(beadsDir) + + // Test MR + mr := &MR{ + ID: "mr-test-123", + Branch: "polecat/test", + Target: "main", + SourceIssue: "gt-abc", + Worker: "test-worker", + Rig: "test-rig", + } + + // Log merge_started + if err := logger.LogMergeStarted(mr); err != nil { + t.Errorf("LogMergeStarted failed: %v", err) + } + + // Log merged + if err := logger.LogMerged(mr, "abc123def456"); err != nil { + t.Errorf("LogMerged failed: %v", err) + } + + // Log merge_failed + if err := logger.LogMergeFailed(mr, "conflict in file.go"); err != nil { + t.Errorf("LogMergeFailed failed: %v", err) + } + + // Log merge_skipped + if err := logger.LogMergeSkipped(mr, "already merged"); err != nil { + t.Errorf("LogMergeSkipped failed: %v", err) + } + + // Read and verify events + logPath := logger.LogPath() + data, err := os.ReadFile(logPath) + if err != nil { + t.Fatalf("Failed to read log file: %v", err) + } + + lines := splitLines(string(data)) + if len(lines) != 4 { + t.Errorf("Expected 4 events, got %d", len(lines)) + } + + // Verify each event type + expectedTypes := []EventType{EventMergeStarted, EventMerged, EventMergeFailed, EventMergeSkipped} + for i, line := range lines { + if line == "" { + continue + } + var event Event + if err := json.Unmarshal([]byte(line), &event); err != nil { + t.Errorf("Failed to parse event %d: %v", i, err) + continue + } + + if event.Type != expectedTypes[i] { + t.Errorf("Event %d: expected type %s, got %s", i, expectedTypes[i], event.Type) + } + + if event.MRID != mr.ID { + t.Errorf("Event %d: expected MR ID %s, got %s", i, mr.ID, event.MRID) + } + + if event.Branch != mr.Branch { + t.Errorf("Event %d: expected branch %s, got %s", i, mr.Branch, event.Branch) + } + + // Check timestamp is recent + if time.Since(event.Timestamp) > time.Minute { + t.Errorf("Event %d: timestamp too old: %v", i, event.Timestamp) + } + } +} + +func splitLines(s string) []string { + var lines []string + start := 0 + for i := 0; i < len(s); i++ { + if s[i] == '\n' { + if start < i { + lines = append(lines, s[start:i]) + } + start = i + 1 + } + } + if start < len(s) { + lines = append(lines, s[start:]) + } + return lines +} diff --git a/internal/mrqueue/mrqueue.go b/internal/mrqueue/mrqueue.go new file mode 100644 index 00000000..5ae5cac8 --- /dev/null +++ b/internal/mrqueue/mrqueue.go @@ -0,0 +1,183 @@ +// Package mrqueue provides merge request queue storage. +// MRs are stored locally in .beads/mq/ and deleted after merge. +// This avoids sync overhead for transient MR state. +package mrqueue + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +// MR represents a merge request in the queue. +type MR struct { + ID string `json:"id"` + Branch string `json:"branch"` // Source branch (e.g., "polecat/nux") + Target string `json:"target"` // Target branch (e.g., "main") + SourceIssue string `json:"source_issue"` // The work item being merged + Worker string `json:"worker"` // Who did the work + Rig string `json:"rig"` // Which rig + Title string `json:"title"` // MR title + Priority int `json:"priority"` // Priority (lower = higher priority) + CreatedAt time.Time `json:"created_at"` +} + +// Queue manages the MR storage. +type Queue struct { + dir string // .beads/mq/ directory +} + +// New creates a new MR queue for the given rig path. +func New(rigPath string) *Queue { + return &Queue{ + dir: filepath.Join(rigPath, ".beads", "mq"), + } +} + +// NewFromWorkdir creates a queue by finding the rig root from a working directory. +func NewFromWorkdir(workdir string) (*Queue, error) { + // Walk up to find .beads or rig root + dir := workdir + for { + beadsDir := filepath.Join(dir, ".beads") + if info, err := os.Stat(beadsDir); err == nil && info.IsDir() { + return &Queue{dir: filepath.Join(beadsDir, "mq")}, nil + } + + parent := filepath.Dir(dir) + if parent == dir { + return nil, fmt.Errorf("could not find .beads directory from %s", workdir) + } + dir = parent + } +} + +// EnsureDir creates the MQ directory if it doesn't exist. +func (q *Queue) EnsureDir() error { + return os.MkdirAll(q.dir, 0755) +} + +// generateID creates a unique MR ID. +func generateID() string { + b := make([]byte, 4) + rand.Read(b) + return fmt.Sprintf("mr-%d-%s", time.Now().Unix(), hex.EncodeToString(b)) +} + +// Submit adds a new MR to the queue. +func (q *Queue) Submit(mr *MR) error { + if err := q.EnsureDir(); err != nil { + return fmt.Errorf("creating mq directory: %w", err) + } + + if mr.ID == "" { + mr.ID = generateID() + } + if mr.CreatedAt.IsZero() { + mr.CreatedAt = time.Now() + } + + data, err := json.MarshalIndent(mr, "", " ") + if err != nil { + return fmt.Errorf("marshaling MR: %w", err) + } + + path := filepath.Join(q.dir, mr.ID+".json") + if err := os.WriteFile(path, data, 0644); err != nil { + return fmt.Errorf("writing MR file: %w", err) + } + + return nil +} + +// List returns all pending MRs, sorted by priority then creation time. +func (q *Queue) List() ([]*MR, error) { + entries, err := os.ReadDir(q.dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil // Empty queue + } + return nil, fmt.Errorf("reading mq directory: %w", err) + } + + var mrs []*MR + for _, entry := range entries { + if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { + continue + } + + mr, err := q.load(filepath.Join(q.dir, entry.Name())) + if err != nil { + continue // Skip malformed files + } + mrs = append(mrs, mr) + } + + // Sort by priority (lower first), then by creation time (older first) + sort.Slice(mrs, func(i, j int) bool { + if mrs[i].Priority != mrs[j].Priority { + return mrs[i].Priority < mrs[j].Priority + } + return mrs[i].CreatedAt.Before(mrs[j].CreatedAt) + }) + + return mrs, nil +} + +// Get retrieves a specific MR by ID. +func (q *Queue) Get(id string) (*MR, error) { + path := filepath.Join(q.dir, id+".json") + return q.load(path) +} + +// load reads an MR from a file path. +func (q *Queue) load(path string) (*MR, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var mr MR + if err := json.Unmarshal(data, &mr); err != nil { + return nil, err + } + + return &mr, nil +} + +// Remove deletes an MR from the queue (after successful merge). +func (q *Queue) Remove(id string) error { + path := filepath.Join(q.dir, id+".json") + err := os.Remove(path) + if os.IsNotExist(err) { + return nil // Already removed + } + return err +} + +// Count returns the number of pending MRs. +func (q *Queue) Count() int { + entries, err := os.ReadDir(q.dir) + if err != nil { + return 0 + } + + count := 0 + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".json") { + count++ + } + } + return count +} + +// Dir returns the queue directory path. +func (q *Queue) Dir() string { + return q.dir +} diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index 862f1975..ea52f0e4 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -12,6 +12,7 @@ import ( "github.com/steveyegge/gastown/internal/beads" "github.com/steveyegge/gastown/internal/git" + "github.com/steveyegge/gastown/internal/mrqueue" "github.com/steveyegge/gastown/internal/rig" ) @@ -67,23 +68,31 @@ func DefaultMergeQueueConfig() *MergeQueueConfig { // Engineer is the merge queue processor that polls for ready merge-requests // and processes them according to the merge queue design. type Engineer struct { - rig *rig.Rig - beads *beads.Beads - git *git.Git - config *MergeQueueConfig - workDir string - output io.Writer // Output destination for user-facing messages + rig *rig.Rig + beads *beads.Beads + mrQueue *mrqueue.Queue + git *git.Git + config *MergeQueueConfig + workDir string + output io.Writer // Output destination for user-facing messages + eventLogger *mrqueue.EventLogger + + // stopCh is used for graceful shutdown + stopCh chan struct{} } // NewEngineer creates a new Engineer for the given rig. func NewEngineer(r *rig.Rig) *Engineer { return &Engineer{ - rig: r, - beads: beads.New(r.Path), - git: git.NewGit(r.Path), - config: DefaultMergeQueueConfig(), - workDir: r.Path, - output: os.Stdout, + rig: r, + beads: beads.New(r.Path), + mrQueue: mrqueue.New(r.Path), + git: git.NewGit(r.Path), + config: DefaultMergeQueueConfig(), + workDir: r.Path, + output: os.Stdout, + eventLogger: mrqueue.NewEventLoggerFromRig(r.Path), + stopCh: make(chan struct{}), } } @@ -280,3 +289,73 @@ func (e *Engineer) handleFailure(mr *beads.Issue, result ProcessResult) { // Full failure handling (assign back to worker, labels) in gt-3x1.4 } + +// ProcessMRFromQueue processes a merge request from wisp queue. +func (e *Engineer) ProcessMRFromQueue(ctx context.Context, mr *mrqueue.MR) ProcessResult { + // MR fields are directly on the struct (no parsing needed) + fmt.Fprintln(e.output, "[Engineer] Processing MR from queue:") + fmt.Fprintf(e.output, " Branch: %s\n", mr.Branch) + fmt.Fprintf(e.output, " Target: %s\n", mr.Target) + fmt.Fprintf(e.output, " Worker: %s\n", mr.Worker) + fmt.Fprintf(e.output, " Source: %s\n", mr.SourceIssue) + + // Emit merge_started event + if err := e.eventLogger.LogMergeStarted(mr); err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merge_started event: %v\n", err) + } + + // TODO: Actual merge implementation + // For now, return failure - actual implementation in gt-3x1.2 + return ProcessResult{ + Success: false, + Error: "ProcessMRFromQueue not fully implemented (see gt-3x1.2)", + } +} + +// handleSuccessFromQueue handles a successful merge from wisp queue. +func (e *Engineer) handleSuccessFromQueue(mr *mrqueue.MR, result ProcessResult) { + // Emit merged event + if err := e.eventLogger.LogMerged(mr, result.MergeCommit); err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merged event: %v\n", err) + } + + // 1. Close source issue with reference to MR + if mr.SourceIssue != "" { + closeReason := fmt.Sprintf("Merged in %s", mr.ID) + if err := e.beads.CloseWithReason(closeReason, mr.SourceIssue); err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to close source issue %s: %v\n", mr.SourceIssue, err) + } else { + fmt.Fprintf(e.output, "[Engineer] Closed source issue: %s\n", mr.SourceIssue) + } + } + + // 2. Delete source branch if configured (local only) + if e.config.DeleteMergedBranches && mr.Branch != "" { + if err := e.git.DeleteBranch(mr.Branch, true); err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to delete branch %s: %v\n", mr.Branch, err) + } else { + fmt.Fprintf(e.output, "[Engineer] Deleted local branch: %s\n", mr.Branch) + } + } + + // 3. Remove MR from queue (ephemeral - just delete the file) + if err := e.mrQueue.Remove(mr.ID); err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to remove MR from queue: %v\n", err) + } + + // 4. Log success + fmt.Fprintf(e.output, "[Engineer] ✓ Merged: %s (commit: %s)\n", mr.ID, result.MergeCommit) +} + +// handleFailureFromQueue handles a failed merge from wisp queue. +func (e *Engineer) handleFailureFromQueue(mr *mrqueue.MR, result ProcessResult) { + // Emit merge_failed event + if err := e.eventLogger.LogMergeFailed(mr, result.Error); err != nil { + fmt.Fprintf(e.output, "[Engineer] Warning: failed to log merge_failed event: %v\n", err) + } + + // MR stays in queue for retry - no action needed on the file + // Log the failure + fmt.Fprintf(e.output, "[Engineer] ✗ Failed: %s - %s\n", mr.ID, result.Error) + fmt.Fprintln(e.output, "[Engineer] MR remains in queue for retry") +} diff --git a/internal/tui/feed/mq_source.go b/internal/tui/feed/mq_source.go new file mode 100644 index 00000000..ed1f5b9d --- /dev/null +++ b/internal/tui/feed/mq_source.go @@ -0,0 +1,189 @@ +package feed + +import ( + "bufio" + "context" + "encoding/json" + "os" + "path/filepath" + "strings" + "time" + + "github.com/steveyegge/gastown/internal/mrqueue" +) + +// MQEventSource reads MQ lifecycle events from mq_events.jsonl +type MQEventSource struct { + file *os.File + events chan Event + cancel context.CancelFunc + logPath string +} + +// NewMQEventSource creates a source that tails MQ events from a beads directory. +func NewMQEventSource(beadsDir string) (*MQEventSource, error) { + logPath := filepath.Join(beadsDir, "mq_events.jsonl") + + // Create file if it doesn't exist + if _, err := os.Stat(logPath); os.IsNotExist(err) { + // Ensure directory exists + if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil { + return nil, err + } + // Create empty file + f, err := os.Create(logPath) + if err != nil { + return nil, err + } + f.Close() + } + + file, err := os.Open(logPath) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + + source := &MQEventSource{ + file: file, + events: make(chan Event, 100), + cancel: cancel, + logPath: logPath, + } + + go source.tail(ctx) + + return source, nil +} + +// NewMQEventSourceFromWorkDir creates an MQ event source by finding the beads directory. +func NewMQEventSourceFromWorkDir(workDir string) (*MQEventSource, error) { + beadsDir, err := FindBeadsDir(workDir) + if err != nil { + return nil, err + } + return NewMQEventSource(beadsDir) +} + +// tail follows the MQ event log file and sends events. +func (s *MQEventSource) tail(ctx context.Context) { + defer close(s.events) + + // Seek to end for live tailing + s.file.Seek(0, 2) + + scanner := bufio.NewScanner(s.file) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + for scanner.Scan() { + line := scanner.Text() + if event := parseMQEventLine(line); event != nil { + select { + case s.events <- *event: + default: + // Drop event if channel full + } + } + } + } + } +} + +// Events returns the event channel. +func (s *MQEventSource) Events() <-chan Event { + return s.events +} + +// Close stops the source. +func (s *MQEventSource) Close() error { + s.cancel() + return s.file.Close() +} + +// parseMQEventLine parses a line from mq_events.jsonl into a feed Event. +func parseMQEventLine(line string) *Event { + if strings.TrimSpace(line) == "" { + return nil + } + + var mqEvent mrqueue.Event + if err := json.Unmarshal([]byte(line), &mqEvent); err != nil { + return nil + } + + // Convert MQ event to feed Event + feedType := mapMQEventType(mqEvent.Type) + message := formatMQEventMessage(mqEvent) + + return &Event{ + Time: mqEvent.Timestamp, + Type: feedType, + Actor: "refinery", + Target: mqEvent.MRID, + Message: message, + Rig: mqEvent.Rig, + Role: "refinery", + Raw: line, + } +} + +// mapMQEventType maps MQ event types to feed event types. +func mapMQEventType(mqType mrqueue.EventType) string { + switch mqType { + case mrqueue.EventMergeStarted: + return "merge_started" + case mrqueue.EventMerged: + return "merged" + case mrqueue.EventMergeFailed: + return "merge_failed" + case mrqueue.EventMergeSkipped: + return "merge_skipped" + default: + return string(mqType) + } +} + +// formatMQEventMessage creates a human-readable message for an MQ event. +func formatMQEventMessage(e mrqueue.Event) string { + branchInfo := e.Branch + if e.Target != "" { + branchInfo += " -> " + e.Target + } + + switch e.Type { + case mrqueue.EventMergeStarted: + return "Merge started: " + branchInfo + case mrqueue.EventMerged: + msg := "Merged: " + branchInfo + if e.MergeCommit != "" { + // Show short commit SHA + sha := e.MergeCommit + if len(sha) > 8 { + sha = sha[:8] + } + msg += " (" + sha + ")" + } + return msg + case mrqueue.EventMergeFailed: + msg := "Merge failed: " + branchInfo + if e.Reason != "" { + msg += " - " + e.Reason + } + return msg + case mrqueue.EventMergeSkipped: + msg := "Merge skipped: " + branchInfo + if e.Reason != "" { + msg += " - " + e.Reason + } + return msg + default: + return string(e.Type) + ": " + branchInfo + } +} diff --git a/internal/tui/feed/mq_source_test.go b/internal/tui/feed/mq_source_test.go new file mode 100644 index 00000000..b1b2358b --- /dev/null +++ b/internal/tui/feed/mq_source_test.go @@ -0,0 +1,144 @@ +package feed + +import ( + "encoding/json" + "testing" + "time" + + "github.com/steveyegge/gastown/internal/mrqueue" +) + +func TestParseMQEventLine(t *testing.T) { + tests := []struct { + name string + event mrqueue.Event + wantType string + wantTarget string + wantContains string // Substring in message + }{ + { + name: "merge_started", + event: mrqueue.Event{ + Timestamp: time.Now(), + Type: mrqueue.EventMergeStarted, + MRID: "mr-123", + Branch: "polecat/nux", + Target: "main", + Worker: "nux", + Rig: "gastown", + }, + wantType: "merge_started", + wantTarget: "mr-123", + wantContains: "Merge started", + }, + { + name: "merged", + event: mrqueue.Event{ + Timestamp: time.Now(), + Type: mrqueue.EventMerged, + MRID: "mr-456", + Branch: "polecat/toast", + Target: "main", + Worker: "toast", + Rig: "gastown", + MergeCommit: "abc123def456789", + }, + wantType: "merged", + wantTarget: "mr-456", + wantContains: "abc123de", // Short SHA + }, + { + name: "merge_failed", + event: mrqueue.Event{ + Timestamp: time.Now(), + Type: mrqueue.EventMergeFailed, + MRID: "mr-789", + Branch: "polecat/capable", + Target: "main", + Worker: "capable", + Rig: "gastown", + Reason: "conflict in main.go", + }, + wantType: "merge_failed", + wantTarget: "mr-789", + wantContains: "conflict in main.go", + }, + { + name: "merge_skipped", + event: mrqueue.Event{ + Timestamp: time.Now(), + Type: mrqueue.EventMergeSkipped, + MRID: "mr-999", + Branch: "polecat/skip", + Target: "main", + Reason: "already merged", + }, + wantType: "merge_skipped", + wantTarget: "mr-999", + wantContains: "already merged", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Marshal to JSON line + data, err := json.Marshal(tt.event) + if err != nil { + t.Fatalf("Failed to marshal event: %v", err) + } + + // Parse the line + result := parseMQEventLine(string(data)) + if result == nil { + t.Fatal("parseMQEventLine returned nil") + } + + if result.Type != tt.wantType { + t.Errorf("Type = %q, want %q", result.Type, tt.wantType) + } + + if result.Target != tt.wantTarget { + t.Errorf("Target = %q, want %q", result.Target, tt.wantTarget) + } + + if tt.wantContains != "" && !contains(result.Message, tt.wantContains) { + t.Errorf("Message = %q, want to contain %q", result.Message, tt.wantContains) + } + + // Actor should be refinery + if result.Actor != "refinery" { + t.Errorf("Actor = %q, want %q", result.Actor, "refinery") + } + + if result.Role != "refinery" { + t.Errorf("Role = %q, want %q", result.Role, "refinery") + } + }) + } +} + +func TestParseMQEventLineEmpty(t *testing.T) { + result := parseMQEventLine("") + if result != nil { + t.Error("Expected nil for empty line") + } + + result = parseMQEventLine(" ") + if result != nil { + t.Error("Expected nil for whitespace-only line") + } + + result = parseMQEventLine("not valid json") + if result != nil { + t.Error("Expected nil for invalid JSON") + } +} + +func contains(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} diff --git a/internal/tui/feed/multi_source.go b/internal/tui/feed/multi_source.go new file mode 100644 index 00000000..c03cad0f --- /dev/null +++ b/internal/tui/feed/multi_source.go @@ -0,0 +1,80 @@ +package feed + +import ( + "sync" +) + +// MultiSource combines events from multiple EventSources into a single stream. +type MultiSource struct { + sources []EventSource + events chan Event + done chan struct{} + wg sync.WaitGroup +} + +// NewMultiSource creates a new multi-source that combines events from all given sources. +func NewMultiSource(sources ...EventSource) *MultiSource { + m := &MultiSource{ + sources: sources, + events: make(chan Event, 100), + done: make(chan struct{}), + } + + // Start a goroutine for each source to forward events + for _, src := range sources { + if src == nil { + continue + } + m.wg.Add(1) + go m.forwardEvents(src) + } + + // Close events channel when all sources are done + go func() { + m.wg.Wait() + close(m.events) + }() + + return m +} + +// forwardEvents reads from a source and forwards to the combined channel. +func (m *MultiSource) forwardEvents(src EventSource) { + defer m.wg.Done() + + srcEvents := src.Events() + for { + select { + case event, ok := <-srcEvents: + if !ok { + return + } + select { + case m.events <- event: + case <-m.done: + return + } + case <-m.done: + return + } + } +} + +// Events returns the combined event channel. +func (m *MultiSource) Events() <-chan Event { + return m.events +} + +// Close stops all sources. +func (m *MultiSource) Close() error { + close(m.done) + var lastErr error + for _, src := range m.sources { + if src != nil { + if err := src.Close(); err != nil { + lastErr = err + } + } + } + return lastErr +} diff --git a/internal/tui/feed/styles.go b/internal/tui/feed/styles.go index 6792db84..b469e6be 100644 --- a/internal/tui/feed/styles.go +++ b/internal/tui/feed/styles.go @@ -106,13 +106,32 @@ var ( "deacon": "🔔", } + // MQ event styles + EventMergeStartedStyle = lipgloss.NewStyle(). + Foreground(colorPrimary) + + EventMergedStyle = lipgloss.NewStyle(). + Foreground(colorSuccess). + Bold(true) + + EventMergeFailedStyle = lipgloss.NewStyle(). + Foreground(colorError). + Bold(true) + + EventMergeSkippedStyle = lipgloss.NewStyle(). + Foreground(colorWarning) + // Event symbols EventSymbols = map[string]string{ - "create": "+", - "update": "→", - "complete": "✓", - "fail": "✗", - "delete": "⊘", - "pin": "📌", + "create": "+", + "update": "→", + "complete": "✓", + "fail": "✗", + "delete": "⊘", + "pin": "📌", + "merge_started": "⚙", + "merged": "✓", + "merge_failed": "✗", + "merge_skipped": "⊘", } ) diff --git a/internal/tui/feed/view.go b/internal/tui/feed/view.go index 828fd174..d7ab0b15 100644 --- a/internal/tui/feed/view.go +++ b/internal/tui/feed/view.go @@ -255,6 +255,14 @@ func (m *Model) renderEvent(e Event) string { symbolStyle = EventFailStyle case "delete": symbolStyle = EventDeleteStyle + case "merge_started": + symbolStyle = EventMergeStartedStyle + case "merged": + symbolStyle = EventMergedStyle + case "merge_failed": + symbolStyle = EventMergeFailedStyle + case "merge_skipped": + symbolStyle = EventMergeSkippedStyle default: symbolStyle = EventUpdateStyle }