refactor(zfc): derive state from files instead of in-memory cache
Apply ZFC (Zero Forge Cache) principle across git error handling and feed curation. Agents now observe raw git output and make their own decisions rather than relying on pre-interpreted error types. - Add GitError type with raw stdout/stderr for observation - Add SwarmGitError following the same pattern - Remove in-memory deduplication maps from Curator - Curator now reads state from feed/events files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
Steve Yegge
parent
b92e46474a
commit
131dac91c8
@@ -16,6 +16,7 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -37,22 +38,12 @@ type FeedEvent struct {
|
||||
}
|
||||
|
||||
// Curator manages the feed curation process.
|
||||
// ZFC: State is derived from the events file, not cached in memory.
|
||||
type Curator struct {
|
||||
townRoot string
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Deduplication state
|
||||
mu sync.Mutex
|
||||
recentDone map[string]time.Time // actor → last done time (dedupe repeated done events)
|
||||
recentSling map[string][]slingRecord // actor → recent slings (aggregate)
|
||||
recentMail map[string]int // actor → mail count in window (aggregate)
|
||||
}
|
||||
|
||||
type slingRecord struct {
|
||||
target string
|
||||
ts time.Time
|
||||
}
|
||||
|
||||
// Deduplication/aggregation settings
|
||||
@@ -74,12 +65,9 @@ const (
|
||||
func NewCurator(townRoot string) *Curator {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &Curator{
|
||||
townRoot: townRoot,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
recentDone: make(map[string]time.Time),
|
||||
recentSling: make(map[string][]slingRecord),
|
||||
recentMail: make(map[string]int),
|
||||
townRoot: townRoot,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,6 +100,7 @@ func (c *Curator) Stop() {
|
||||
}
|
||||
|
||||
// run is the main curator loop.
|
||||
// ZFC: No in-memory state to clean up - state is derived from the events file.
|
||||
func (c *Curator) run(file *os.File) {
|
||||
defer c.wg.Done()
|
||||
defer file.Close()
|
||||
@@ -120,18 +109,11 @@ func (c *Curator) run(file *os.File) {
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Cleanup ticker for stale aggregation state
|
||||
cleanupTicker := time.NewTicker(time.Minute)
|
||||
defer cleanupTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
|
||||
case <-cleanupTicker.C:
|
||||
c.cleanupStaleState()
|
||||
|
||||
case <-ticker.C:
|
||||
// Read available lines
|
||||
for {
|
||||
@@ -171,93 +153,131 @@ func (c *Curator) processLine(line string) {
|
||||
}
|
||||
|
||||
// shouldDedupe checks if an event should be deduplicated.
|
||||
// ZFC: Derives state from the FEED file (what we've already output), not in-memory cache.
|
||||
// Returns true if the event should be dropped.
|
||||
func (c *Curator) shouldDedupe(event *events.Event) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
|
||||
switch event.Type {
|
||||
case events.TypeDone:
|
||||
// Dedupe repeated done events from same actor within window
|
||||
if lastDone, ok := c.recentDone[event.Actor]; ok {
|
||||
if now.Sub(lastDone) < doneDedupeWindow {
|
||||
return true // Skip duplicate
|
||||
// Check if we've already written a done event for this actor to the feed
|
||||
recentFeedEvents := c.readRecentFeedEvents(doneDedupeWindow)
|
||||
for _, e := range recentFeedEvents {
|
||||
if e.Type == events.TypeDone && e.Actor == event.Actor {
|
||||
return true // Skip duplicate (already in feed)
|
||||
}
|
||||
}
|
||||
c.recentDone[event.Actor] = now
|
||||
return false
|
||||
|
||||
case events.TypeSling:
|
||||
// Track for potential aggregation (but don't dedupe single slings)
|
||||
target, _ := event.Payload["target"].(string)
|
||||
c.recentSling[event.Actor] = append(c.recentSling[event.Actor], slingRecord{
|
||||
target: target,
|
||||
ts: now,
|
||||
})
|
||||
// Prune old records
|
||||
c.recentSling[event.Actor] = c.pruneRecords(c.recentSling[event.Actor], slingAggregateWindow)
|
||||
return false
|
||||
|
||||
case events.TypeMail:
|
||||
// Track mail count for potential aggregation
|
||||
c.recentMail[event.Actor]++
|
||||
// Reset after window (rough approximation)
|
||||
go func(actor string) {
|
||||
time.Sleep(mailAggregateWindow)
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.recentMail[actor] > 0 {
|
||||
c.recentMail[actor]--
|
||||
}
|
||||
}(event.Actor)
|
||||
return false
|
||||
}
|
||||
|
||||
// Sling and mail events are not deduplicated, only aggregated in writeFeedEvent
|
||||
return false
|
||||
}
|
||||
|
||||
// pruneRecords removes records older than the window.
|
||||
func (c *Curator) pruneRecords(records []slingRecord, window time.Duration) []slingRecord {
|
||||
now := time.Now()
|
||||
result := make([]slingRecord, 0, len(records))
|
||||
for _, r := range records {
|
||||
if now.Sub(r.ts) < window {
|
||||
result = append(result, r)
|
||||
}
|
||||
// readRecentFeedEvents reads feed events from the feed file within the given time window.
|
||||
// ZFC: The feed file is the observable state of what we've already output.
|
||||
func (c *Curator) readRecentFeedEvents(window time.Duration) []FeedEvent {
|
||||
feedPath := filepath.Join(c.townRoot, FeedFile)
|
||||
|
||||
data, err := os.ReadFile(feedPath)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
cutoff := now.Add(-window)
|
||||
var result []FeedEvent
|
||||
|
||||
// Parse lines from the end (most recent first) for efficiency
|
||||
lines := strings.Split(string(data), "\n")
|
||||
for i := len(lines) - 1; i >= 0; i-- {
|
||||
line := strings.TrimSpace(lines[i])
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
var event FeedEvent
|
||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse timestamp
|
||||
ts, err := time.Parse(time.RFC3339, event.Timestamp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Stop if we've gone past the window
|
||||
if ts.Before(cutoff) {
|
||||
break
|
||||
}
|
||||
|
||||
result = append(result, event)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// cleanupStaleState removes old entries from tracking maps.
|
||||
func (c *Curator) cleanupStaleState() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
// readRecentEvents reads events from the events file within the given time window.
|
||||
// ZFC: This is the observable state that replaces in-memory caching.
|
||||
// Uses tail-like reading for performance (reads last N lines).
|
||||
func (c *Curator) readRecentEvents(window time.Duration) []events.Event {
|
||||
eventsPath := filepath.Join(c.townRoot, events.EventsFile)
|
||||
|
||||
// Read the file (for small files, this is fine; for large files, consider tail-like reading)
|
||||
data, err := os.ReadFile(eventsPath)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
staleThreshold := 5 * time.Minute
|
||||
cutoff := now.Add(-window)
|
||||
var result []events.Event
|
||||
|
||||
// Clean stale done records
|
||||
for actor, ts := range c.recentDone {
|
||||
if now.Sub(ts) > staleThreshold {
|
||||
delete(c.recentDone, actor)
|
||||
// Parse lines from the end (most recent first) for efficiency
|
||||
lines := strings.Split(string(data), "\n")
|
||||
for i := len(lines) - 1; i >= 0; i-- {
|
||||
line := strings.TrimSpace(lines[i])
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
var event events.Event
|
||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse timestamp
|
||||
ts, err := time.Parse(time.RFC3339, event.Timestamp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Stop if we've gone past the window
|
||||
if ts.Before(cutoff) {
|
||||
break
|
||||
}
|
||||
|
||||
result = append(result, event)
|
||||
}
|
||||
|
||||
// Clean stale sling records
|
||||
for actor, records := range c.recentSling {
|
||||
c.recentSling[actor] = c.pruneRecords(records, staleThreshold)
|
||||
if len(c.recentSling[actor]) == 0 {
|
||||
delete(c.recentSling, actor)
|
||||
return result
|
||||
}
|
||||
|
||||
// countRecentSlings counts sling events from an actor within the given window.
|
||||
// ZFC: Derives count from the events file, not in-memory cache.
|
||||
func (c *Curator) countRecentSlings(actor string, window time.Duration) int {
|
||||
recentEvents := c.readRecentEvents(window)
|
||||
count := 0
|
||||
for _, e := range recentEvents {
|
||||
if e.Type == events.TypeSling && e.Actor == actor {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
// Reset mail counts
|
||||
c.recentMail = make(map[string]int)
|
||||
return count
|
||||
}
|
||||
|
||||
// writeFeedEvent writes a curated event to the feed file.
|
||||
// ZFC: Aggregation is derived from the events file, not in-memory cache.
|
||||
func (c *Curator) writeFeedEvent(event *events.Event) {
|
||||
feedEvent := FeedEvent{
|
||||
Timestamp: event.Timestamp,
|
||||
@@ -268,15 +288,14 @@ func (c *Curator) writeFeedEvent(event *events.Event) {
|
||||
Payload: event.Payload,
|
||||
}
|
||||
|
||||
// Check for aggregation opportunity
|
||||
c.mu.Lock()
|
||||
// Check for aggregation opportunity (ZFC: derive from events file)
|
||||
if event.Type == events.TypeSling {
|
||||
if records := c.recentSling[event.Actor]; len(records) >= minAggregateCount {
|
||||
feedEvent.Count = len(records)
|
||||
feedEvent.Summary = fmt.Sprintf("%s dispatching work to %d agents", event.Actor, len(records))
|
||||
slingCount := c.countRecentSlings(event.Actor, slingAggregateWindow)
|
||||
if slingCount >= minAggregateCount {
|
||||
feedEvent.Count = slingCount
|
||||
feedEvent.Summary = fmt.Sprintf("%s dispatching work to %d agents", event.Actor, slingCount)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
data, err := json.Marshal(feedEvent)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user