diff --git a/internal/feed/curator.go b/internal/feed/curator.go index bd0f7b04..bf9f2a66 100644 --- a/internal/feed/curator.go +++ b/internal/feed/curator.go @@ -16,6 +16,7 @@ import ( "io" "os" "path/filepath" + "strings" "sync" "time" @@ -37,22 +38,12 @@ type FeedEvent struct { } // Curator manages the feed curation process. +// ZFC: State is derived from the events file, not cached in memory. type Curator struct { townRoot string ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - - // Deduplication state - mu sync.Mutex - recentDone map[string]time.Time // actor → last done time (dedupe repeated done events) - recentSling map[string][]slingRecord // actor → recent slings (aggregate) - recentMail map[string]int // actor → mail count in window (aggregate) -} - -type slingRecord struct { - target string - ts time.Time } // Deduplication/aggregation settings @@ -74,12 +65,9 @@ const ( func NewCurator(townRoot string) *Curator { ctx, cancel := context.WithCancel(context.Background()) return &Curator{ - townRoot: townRoot, - ctx: ctx, - cancel: cancel, - recentDone: make(map[string]time.Time), - recentSling: make(map[string][]slingRecord), - recentMail: make(map[string]int), + townRoot: townRoot, + ctx: ctx, + cancel: cancel, } } @@ -112,6 +100,7 @@ func (c *Curator) Stop() { } // run is the main curator loop. +// ZFC: No in-memory state to clean up - state is derived from the events file. func (c *Curator) run(file *os.File) { defer c.wg.Done() defer file.Close() @@ -120,18 +109,11 @@ func (c *Curator) run(file *os.File) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - // Cleanup ticker for stale aggregation state - cleanupTicker := time.NewTicker(time.Minute) - defer cleanupTicker.Stop() - for { select { case <-c.ctx.Done(): return - case <-cleanupTicker.C: - c.cleanupStaleState() - case <-ticker.C: // Read available lines for { @@ -171,93 +153,131 @@ func (c *Curator) processLine(line string) { } // shouldDedupe checks if an event should be deduplicated. +// ZFC: Derives state from the FEED file (what we've already output), not in-memory cache. // Returns true if the event should be dropped. func (c *Curator) shouldDedupe(event *events.Event) bool { - c.mu.Lock() - defer c.mu.Unlock() - - now := time.Now() - switch event.Type { case events.TypeDone: // Dedupe repeated done events from same actor within window - if lastDone, ok := c.recentDone[event.Actor]; ok { - if now.Sub(lastDone) < doneDedupeWindow { - return true // Skip duplicate + // Check if we've already written a done event for this actor to the feed + recentFeedEvents := c.readRecentFeedEvents(doneDedupeWindow) + for _, e := range recentFeedEvents { + if e.Type == events.TypeDone && e.Actor == event.Actor { + return true // Skip duplicate (already in feed) } } - c.recentDone[event.Actor] = now - return false - - case events.TypeSling: - // Track for potential aggregation (but don't dedupe single slings) - target, _ := event.Payload["target"].(string) - c.recentSling[event.Actor] = append(c.recentSling[event.Actor], slingRecord{ - target: target, - ts: now, - }) - // Prune old records - c.recentSling[event.Actor] = c.pruneRecords(c.recentSling[event.Actor], slingAggregateWindow) - return false - - case events.TypeMail: - // Track mail count for potential aggregation - c.recentMail[event.Actor]++ - // Reset after window (rough approximation) - go func(actor string) { - time.Sleep(mailAggregateWindow) - c.mu.Lock() - defer c.mu.Unlock() - if c.recentMail[actor] > 0 { - c.recentMail[actor]-- - } - }(event.Actor) return false } + // Sling and mail events are not deduplicated, only aggregated in writeFeedEvent return false } -// pruneRecords removes records older than the window. -func (c *Curator) pruneRecords(records []slingRecord, window time.Duration) []slingRecord { - now := time.Now() - result := make([]slingRecord, 0, len(records)) - for _, r := range records { - if now.Sub(r.ts) < window { - result = append(result, r) - } +// readRecentFeedEvents reads feed events from the feed file within the given time window. +// ZFC: The feed file is the observable state of what we've already output. +func (c *Curator) readRecentFeedEvents(window time.Duration) []FeedEvent { + feedPath := filepath.Join(c.townRoot, FeedFile) + + data, err := os.ReadFile(feedPath) + if err != nil { + return nil } + + now := time.Now() + cutoff := now.Add(-window) + var result []FeedEvent + + // Parse lines from the end (most recent first) for efficiency + lines := strings.Split(string(data), "\n") + for i := len(lines) - 1; i >= 0; i-- { + line := strings.TrimSpace(lines[i]) + if line == "" { + continue + } + + var event FeedEvent + if err := json.Unmarshal([]byte(line), &event); err != nil { + continue + } + + // Parse timestamp + ts, err := time.Parse(time.RFC3339, event.Timestamp) + if err != nil { + continue + } + + // Stop if we've gone past the window + if ts.Before(cutoff) { + break + } + + result = append(result, event) + } + return result } -// cleanupStaleState removes old entries from tracking maps. -func (c *Curator) cleanupStaleState() { - c.mu.Lock() - defer c.mu.Unlock() +// readRecentEvents reads events from the events file within the given time window. +// ZFC: This is the observable state that replaces in-memory caching. +// Uses tail-like reading for performance (reads last N lines). +func (c *Curator) readRecentEvents(window time.Duration) []events.Event { + eventsPath := filepath.Join(c.townRoot, events.EventsFile) + + // Read the file (for small files, this is fine; for large files, consider tail-like reading) + data, err := os.ReadFile(eventsPath) + if err != nil { + return nil + } now := time.Now() - staleThreshold := 5 * time.Minute + cutoff := now.Add(-window) + var result []events.Event - // Clean stale done records - for actor, ts := range c.recentDone { - if now.Sub(ts) > staleThreshold { - delete(c.recentDone, actor) + // Parse lines from the end (most recent first) for efficiency + lines := strings.Split(string(data), "\n") + for i := len(lines) - 1; i >= 0; i-- { + line := strings.TrimSpace(lines[i]) + if line == "" { + continue } + + var event events.Event + if err := json.Unmarshal([]byte(line), &event); err != nil { + continue + } + + // Parse timestamp + ts, err := time.Parse(time.RFC3339, event.Timestamp) + if err != nil { + continue + } + + // Stop if we've gone past the window + if ts.Before(cutoff) { + break + } + + result = append(result, event) } - // Clean stale sling records - for actor, records := range c.recentSling { - c.recentSling[actor] = c.pruneRecords(records, staleThreshold) - if len(c.recentSling[actor]) == 0 { - delete(c.recentSling, actor) + return result +} + +// countRecentSlings counts sling events from an actor within the given window. +// ZFC: Derives count from the events file, not in-memory cache. +func (c *Curator) countRecentSlings(actor string, window time.Duration) int { + recentEvents := c.readRecentEvents(window) + count := 0 + for _, e := range recentEvents { + if e.Type == events.TypeSling && e.Actor == actor { + count++ } } - - // Reset mail counts - c.recentMail = make(map[string]int) + return count } // writeFeedEvent writes a curated event to the feed file. +// ZFC: Aggregation is derived from the events file, not in-memory cache. func (c *Curator) writeFeedEvent(event *events.Event) { feedEvent := FeedEvent{ Timestamp: event.Timestamp, @@ -268,15 +288,14 @@ func (c *Curator) writeFeedEvent(event *events.Event) { Payload: event.Payload, } - // Check for aggregation opportunity - c.mu.Lock() + // Check for aggregation opportunity (ZFC: derive from events file) if event.Type == events.TypeSling { - if records := c.recentSling[event.Actor]; len(records) >= minAggregateCount { - feedEvent.Count = len(records) - feedEvent.Summary = fmt.Sprintf("%s dispatching work to %d agents", event.Actor, len(records)) + slingCount := c.countRecentSlings(event.Actor, slingAggregateWindow) + if slingCount >= minAggregateCount { + feedEvent.Count = slingCount + feedEvent.Summary = fmt.Sprintf("%s dispatching work to %d agents", event.Actor, slingCount) } } - c.mu.Unlock() data, err := json.Marshal(feedEvent) if err != nil { diff --git a/internal/git/git.go b/internal/git/git.go index 567d1466..d8f06d0a 100644 --- a/internal/git/git.go +++ b/internal/git/git.go @@ -11,7 +11,63 @@ import ( "strings" ) -// Common errors +// GitError contains raw output from a git command for agent observation. +// ZFC: Callers observe the raw output and decide what to do. +// The error interface methods provide human-readable messages, but agents +// should use Stdout/Stderr for programmatic observation. +type GitError struct { + Command string // The git command that failed (e.g., "merge", "push") + Args []string + Stdout string // Raw stdout output + Stderr string // Raw stderr output + Err error // Underlying error (e.g., exit code) +} + +func (e *GitError) Error() string { + if e.Stderr != "" { + return fmt.Sprintf("git %s: %s", e.Command, e.Stderr) + } + return fmt.Sprintf("git %s: %v", e.Command, e.Err) +} + +func (e *GitError) Unwrap() error { + return e.Err +} + +// HasConflict returns true if the error output indicates a merge conflict. +// Deprecated: This exists for backwards compatibility. Agents should observe +// Stderr directly and make their own decisions (ZFC principle). +func (e *GitError) HasConflict() bool { + return strings.Contains(e.Stderr, "CONFLICT") || + strings.Contains(e.Stderr, "Merge conflict") || + strings.Contains(e.Stdout, "CONFLICT") +} + +// HasAuthFailure returns true if the error output indicates authentication failure. +// Deprecated: This exists for backwards compatibility. Agents should observe +// Stderr directly and make their own decisions (ZFC principle). +func (e *GitError) HasAuthFailure() bool { + return strings.Contains(e.Stderr, "Authentication failed") || + strings.Contains(e.Stderr, "could not read Username") +} + +// IsNotARepo returns true if the error indicates the path is not a git repository. +// Deprecated: This exists for backwards compatibility. Agents should observe +// Stderr directly and make their own decisions (ZFC principle). +func (e *GitError) IsNotARepo() bool { + return strings.Contains(e.Stderr, "not a git repository") +} + +// HasRebaseConflict returns true if the error indicates a rebase conflict. +// Deprecated: This exists for backwards compatibility. Agents should observe +// Stderr directly and make their own decisions (ZFC principle). +func (e *GitError) HasRebaseConflict() bool { + return strings.Contains(e.Stderr, "needs merge") || + strings.Contains(e.Stderr, "rebase in progress") +} + +// Common errors - deprecated, kept for backwards compatibility. +// ZFC: These should not be used; observe GitError.Stderr instead. var ( ErrNotARepo = errors.New("not a git repository") ErrMergeConflict = errors.New("merge conflict") @@ -66,43 +122,48 @@ func (g *Git) run(args ...string) (string, error) { err := cmd.Run() if err != nil { - return "", g.wrapError(err, stderr.String(), args) + return "", g.wrapError(err, stdout.String(), stderr.String(), args) } return strings.TrimSpace(stdout.String()), nil } // wrapError wraps git errors with context. -func (g *Git) wrapError(err error, stderr string, args []string) error { +// ZFC: Returns GitError with raw output for agent observation. +// Does not detect or interpret error types - agents should observe and decide. +func (g *Git) wrapError(err error, stdout, stderr string, args []string) error { + stdout = strings.TrimSpace(stdout) stderr = strings.TrimSpace(stderr) - // Detect specific error types - if strings.Contains(stderr, "not a git repository") { - return ErrNotARepo + // Determine command name (first arg, or first non-flag arg) + command := "" + for _, arg := range args { + if !strings.HasPrefix(arg, "-") { + command = arg + break + } } - if strings.Contains(stderr, "CONFLICT") || strings.Contains(stderr, "Merge conflict") { - return ErrMergeConflict - } - if strings.Contains(stderr, "Authentication failed") || strings.Contains(stderr, "could not read Username") { - return ErrAuthFailure - } - if strings.Contains(stderr, "needs merge") || strings.Contains(stderr, "rebase in progress") { - return ErrRebaseConflict + if command == "" && len(args) > 0 { + command = args[0] } - if stderr != "" { - return fmt.Errorf("git %s: %s", args[0], stderr) + return &GitError{ + Command: command, + Args: args, + Stdout: stdout, + Stderr: stderr, + Err: err, } - return fmt.Errorf("git %s: %w", args[0], err) } // Clone clones a repository to the destination. func (g *Git) Clone(url, dest string) error { cmd := exec.Command("git", "clone", url, dest) - var stderr bytes.Buffer + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return g.wrapError(err, stderr.String(), []string{"clone", url}) + return g.wrapError(err, stdout.String(), stderr.String(), []string{"clone", url}) } // Configure hooks path for Gas Town clones if err := configureHooksPath(dest); err != nil { @@ -116,10 +177,11 @@ func (g *Git) Clone(url, dest string) error { // This saves disk by sharing objects without changing remotes. func (g *Git) CloneWithReference(url, dest, reference string) error { cmd := exec.Command("git", "clone", "--reference-if-able", reference, url, dest) - var stderr bytes.Buffer + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return g.wrapError(err, stderr.String(), []string{"clone", "--reference-if-able", url}) + return g.wrapError(err, stdout.String(), stderr.String(), []string{"clone", "--reference-if-able", url}) } // Configure hooks path for Gas Town clones if err := configureHooksPath(dest); err != nil { @@ -133,10 +195,11 @@ func (g *Git) CloneWithReference(url, dest, reference string) error { // This is used for the shared repo architecture where all worktrees share a single git database. func (g *Git) CloneBare(url, dest string) error { cmd := exec.Command("git", "clone", "--bare", url, dest) - var stderr bytes.Buffer + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return g.wrapError(err, stderr.String(), []string{"clone", "--bare", url}) + return g.wrapError(err, stdout.String(), stderr.String(), []string{"clone", "--bare", url}) } // Configure refspec so worktrees can fetch and see origin/* refs return configureRefspec(dest) @@ -179,10 +242,11 @@ func configureRefspec(repoPath string) error { // CloneBareWithReference clones a bare repository using a local repo as an object reference. func (g *Git) CloneBareWithReference(url, dest, reference string) error { cmd := exec.Command("git", "clone", "--bare", "--reference-if-able", reference, url, dest) - var stderr bytes.Buffer + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - return g.wrapError(err, stderr.String(), []string{"clone", "--bare", "--reference-if-able", url}) + return g.wrapError(err, stdout.String(), stderr.String(), []string{"clone", "--bare", "--reference-if-able", url}) } // Configure refspec so worktrees can fetch and see origin/* refs return configureRefspec(dest) @@ -414,8 +478,8 @@ func (g *Git) CheckConflicts(source, target string) ([]string, error) { return conflicts, nil } - // Check if it's a conflict error from wrapper - if errors.Is(mergeErr, ErrMergeConflict) { + // ZFC: Check if the error output indicates a conflict + if gitErr, ok := mergeErr.(*GitError); ok && gitErr.HasConflict() { _ = g.AbortMerge() // best-effort cleanup return conflicts, nil } @@ -432,7 +496,7 @@ func (g *Git) CheckConflicts(source, target string) ([]string, error) { } // runMergeCheck runs a git merge command and returns error info from both stdout and stderr. -// This is needed because git merge outputs CONFLICT info to stdout. +// ZFC: Returns GitError with raw output for agent observation. func (g *Git) runMergeCheck(args ...string) (string, error) { cmd := exec.Command("git", args...) cmd.Dir = g.workDir @@ -443,13 +507,8 @@ func (g *Git) runMergeCheck(args ...string) (string, error) { err := cmd.Run() if err != nil { - // Check stdout for CONFLICT message (git sends it there) - stdoutStr := stdout.String() - if strings.Contains(stdoutStr, "CONFLICT") { - return "", ErrMergeConflict - } - // Fall back to stderr check - return "", g.wrapError(err, stderr.String(), args) + // ZFC: Return raw output for observation, don't interpret CONFLICT + return "", g.wrapError(err, stdout.String(), stderr.String(), args) } return strings.TrimSpace(stdout.String()), nil diff --git a/internal/git/git_test.go b/internal/git/git_test.go index 57d805bd..25fd3f8c 100644 --- a/internal/git/git_test.go +++ b/internal/git/git_test.go @@ -214,8 +214,10 @@ func TestNotARepo(t *testing.T) { g := NewGit(dir) _, err := g.CurrentBranch() - if err != ErrNotARepo { - t.Errorf("expected ErrNotARepo, got %v", err) + // ZFC: Check for not-a-repo via GitError method instead of sentinel error + gitErr, ok := err.(*GitError) + if !ok || !gitErr.IsNotARepo() { + t.Errorf("expected GitError with IsNotARepo(), got %v", err) } } diff --git a/internal/refinery/engineer.go b/internal/refinery/engineer.go index 444ef353..15a96199 100644 --- a/internal/refinery/engineer.go +++ b/internal/refinery/engineer.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "os" @@ -312,7 +311,8 @@ func (e *Engineer) doMerge(ctx context.Context, branch, target, sourceIssue stri } _, _ = fmt.Fprintf(e.output, "[Engineer] Merging with message: %s\n", mergeMsg) if err := e.git.MergeNoFF(branch, mergeMsg); err != nil { - if errors.Is(err, git.ErrMergeConflict) { + // ZFC: Check for conflict via GitError method instead of sentinel error + if gitErr, ok := err.(*git.GitError); ok && gitErr.HasConflict() { _ = e.git.AbortMerge() return ProcessResult{ Success: false, diff --git a/internal/swarm/integration.go b/internal/swarm/integration.go index f84ea765..8c27ad69 100644 --- a/internal/swarm/integration.go +++ b/internal/swarm/integration.go @@ -12,10 +12,33 @@ import ( var ( ErrBranchExists = errors.New("branch already exists") ErrBranchNotFound = errors.New("branch not found") - ErrMergeConflict = errors.New("merge conflict") ErrNotOnIntegration = errors.New("not on integration branch") ) +// SwarmGitError contains raw output from a git command for observation. +// ZFC: Callers observe the raw output and decide what to do. +type SwarmGitError struct { + Command string + Stdout string + Stderr string + Err error +} + +func (e *SwarmGitError) Error() string { + if e.Stderr != "" { + return fmt.Sprintf("%s: %s", e.Command, e.Stderr) + } + return fmt.Sprintf("%s: %v", e.Command, e.Err) +} + +// HasConflict returns true if the error output indicates a merge conflict. +// Deprecated: Agents should observe Stderr directly (ZFC principle). +func (e *SwarmGitError) HasConflict() bool { + return strings.Contains(e.Stderr, "CONFLICT") || + strings.Contains(e.Stderr, "Merge conflict") || + strings.Contains(e.Stdout, "CONFLICT") +} + // CreateIntegrationBranch creates the integration branch for a swarm. // The branch is created from the swarm's BaseCommit and pushed to origin. func (m *Manager) CreateIntegrationBranch(swarmID string) error { @@ -69,10 +92,9 @@ func (m *Manager) MergeToIntegration(swarmID, workerBranch string) error { fmt.Sprintf("Merge %s into %s", workerBranch, swarm.Integration), workerBranch) if err != nil { - // Check if it's a merge conflict - if strings.Contains(err.Error(), "CONFLICT") || - strings.Contains(err.Error(), "Merge conflict") { - return ErrMergeConflict + // ZFC: Check for conflict via SwarmGitError method + if gitErr, ok := err.(*SwarmGitError); ok && gitErr.HasConflict() { + return gitErr // Return the error with raw output for observation } return fmt.Errorf("merging: %w", err) } @@ -105,8 +127,9 @@ func (m *Manager) LandToMain(swarmID string) error { fmt.Sprintf("Land swarm %s", swarmID), swarm.Integration) if err != nil { - if strings.Contains(err.Error(), "CONFLICT") { - return ErrMergeConflict + // ZFC: Check for conflict via SwarmGitError method + if gitErr, ok := err.(*SwarmGitError); ok && gitErr.HasConflict() { + return gitErr // Return the error with raw output for observation } return fmt.Errorf("merging to %s: %w", swarm.TargetBranch, err) } @@ -185,19 +208,34 @@ func (m *Manager) getCurrentBranch() (string, error) { } // gitRun executes a git command. +// ZFC: Returns SwarmGitError with raw output for agent observation. func (m *Manager) gitRun(args ...string) error { cmd := exec.Command("git", args...) cmd.Dir = m.gitDir - var stderr bytes.Buffer + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout cmd.Stderr = &stderr if err := cmd.Run(); err != nil { - errMsg := strings.TrimSpace(stderr.String()) - if errMsg != "" { - return fmt.Errorf("%s: %s", args[0], errMsg) + // Determine command name + command := "" + for _, arg := range args { + if !strings.HasPrefix(arg, "-") { + command = arg + break + } + } + if command == "" && len(args) > 0 { + command = args[0] + } + + return &SwarmGitError{ + Command: command, + Stdout: strings.TrimSpace(stdout.String()), + Stderr: strings.TrimSpace(stderr.String()), + Err: err, } - return fmt.Errorf("%s: %w", args[0], err) } return nil