Files
gastown/internal/tui/feed/mq_source.go
max 1b69576573 fix: Address golangci-lint errors (errcheck, gosec) (#76)
Apply PR #76 from dannomayernotabot:

- Add golangci exclusions for internal package false positives
- Tighten file permissions (0644 -> 0600) for sensitive files
- Add ReadHeaderTimeout to HTTP server (slowloris prevention)
- Explicit error ignoring with _ = for intentional cases
- Add //nolint comments with justifications
- Spelling: cancelled -> canceled (US locale)

Co-Authored-By: dannomayernotabot <noreply@github.com>

🤖 Generated with Claude Code
2026-01-03 16:11:55 -08:00

190 lines
4.1 KiB
Go

package feed
import (
"bufio"
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"time"
"github.com/steveyegge/gastown/internal/mrqueue"
)
// MQEventSource reads MQ lifecycle events from mq_events.jsonl
type MQEventSource struct {
file *os.File
events chan Event
cancel context.CancelFunc
logPath string
}
// NewMQEventSource creates a source that tails MQ events from a beads directory.
func NewMQEventSource(beadsDir string) (*MQEventSource, error) {
logPath := filepath.Join(beadsDir, "mq_events.jsonl")
// Create file if it doesn't exist
if _, err := os.Stat(logPath); os.IsNotExist(err) {
// Ensure directory exists
if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil {
return nil, err
}
// Create empty file
f, err := os.Create(logPath)
if err != nil {
return nil, err
}
_ = f.Close() //nolint:gosec // G104: best-effort close on file creation
}
file, err := os.Open(logPath)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
source := &MQEventSource{
file: file,
events: make(chan Event, 100),
cancel: cancel,
logPath: logPath,
}
go source.tail(ctx)
return source, nil
}
// NewMQEventSourceFromWorkDir creates an MQ event source by finding the beads directory.
func NewMQEventSourceFromWorkDir(workDir string) (*MQEventSource, error) {
beadsDir, err := FindBeadsDir(workDir)
if err != nil {
return nil, err
}
return NewMQEventSource(beadsDir)
}
// tail follows the MQ event log file and sends events.
func (s *MQEventSource) tail(ctx context.Context) {
defer close(s.events)
// Seek to end for live tailing
_, _ = s.file.Seek(0, 2)
scanner := bufio.NewScanner(s.file)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for scanner.Scan() {
line := scanner.Text()
if event := parseMQEventLine(line); event != nil {
select {
case s.events <- *event:
default:
// Drop event if channel full
}
}
}
}
}
}
// Events returns the event channel.
func (s *MQEventSource) Events() <-chan Event {
return s.events
}
// Close stops the source.
func (s *MQEventSource) Close() error {
s.cancel()
return s.file.Close()
}
// parseMQEventLine parses a line from mq_events.jsonl into a feed Event.
func parseMQEventLine(line string) *Event {
if strings.TrimSpace(line) == "" {
return nil
}
var mqEvent mrqueue.Event
if err := json.Unmarshal([]byte(line), &mqEvent); err != nil {
return nil
}
// Convert MQ event to feed Event
feedType := mapMQEventType(mqEvent.Type)
message := formatMQEventMessage(mqEvent)
return &Event{
Time: mqEvent.Timestamp,
Type: feedType,
Actor: "refinery",
Target: mqEvent.MRID,
Message: message,
Rig: mqEvent.Rig,
Role: "refinery",
Raw: line,
}
}
// mapMQEventType maps MQ event types to feed event types.
func mapMQEventType(mqType mrqueue.EventType) string {
switch mqType {
case mrqueue.EventMergeStarted:
return "merge_started"
case mrqueue.EventMerged:
return "merged"
case mrqueue.EventMergeFailed:
return "merge_failed"
case mrqueue.EventMergeSkipped:
return "merge_skipped"
default:
return string(mqType)
}
}
// formatMQEventMessage creates a human-readable message for an MQ event.
func formatMQEventMessage(e mrqueue.Event) string {
branchInfo := e.Branch
if e.Target != "" {
branchInfo += " -> " + e.Target
}
switch e.Type {
case mrqueue.EventMergeStarted:
return "Merge started: " + branchInfo
case mrqueue.EventMerged:
msg := "Merged: " + branchInfo
if e.MergeCommit != "" {
// Show short commit SHA
sha := e.MergeCommit
if len(sha) > 8 {
sha = sha[:8]
}
msg += " (" + sha + ")"
}
return msg
case mrqueue.EventMergeFailed:
msg := "Merge failed: " + branchInfo
if e.Reason != "" {
msg += " - " + e.Reason
}
return msg
case mrqueue.EventMergeSkipped:
msg := "Merge skipped: " + branchInfo
if e.Reason != "" {
msg += " - " + e.Reason
}
return msg
default:
return string(e.Type) + ": " + branchInfo
}
}