perf: parallelize mail inbox queries for ~6x speedup
The listFromDir function was making 3-6 serial bd subprocess calls (one per identity variant × status). This caused gt mail inbox to take ~32 seconds in typical setups. Change to run all queries in parallel using goroutines, reducing inbox load time to ~5 seconds. Implementation notes: - Pre-allocate results slice indexed by query position (no mutex needed) - Deduplication happens after wg.Wait() in single-threaded collection - Existing error handling preserved (partial success allowed) Fixes #705
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"regexp"
|
"regexp"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/steveyegge/gastown/internal/beads"
|
"github.com/steveyegge/gastown/internal/beads"
|
||||||
@@ -107,45 +108,76 @@ func (m *Mailbox) listBeads() ([]*Message, error) {
|
|||||||
return messages, nil
|
return messages, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// queryResult holds the result of a single query.
|
||||||
|
type queryResult struct {
|
||||||
|
messages []*Message
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
// listFromDir queries messages from a beads directory.
|
// listFromDir queries messages from a beads directory.
|
||||||
// Returns messages where identity is the assignee OR a CC recipient.
|
// Returns messages where identity is the assignee OR a CC recipient.
|
||||||
// Includes both open and hooked messages (hooked = auto-assigned handoff mail).
|
// Includes both open and hooked messages (hooked = auto-assigned handoff mail).
|
||||||
// If all queries fail, returns the last error encountered.
|
// If all queries fail, returns the last error encountered.
|
||||||
|
// Queries are parallelized for performance (~6x speedup).
|
||||||
func (m *Mailbox) listFromDir(beadsDir string) ([]*Message, error) {
|
func (m *Mailbox) listFromDir(beadsDir string) ([]*Message, error) {
|
||||||
|
// Get all identity variants to query (handles legacy vs normalized formats)
|
||||||
|
identities := m.identityVariants()
|
||||||
|
|
||||||
|
// Build list of queries to run in parallel
|
||||||
|
type querySpec struct {
|
||||||
|
filterFlag string
|
||||||
|
filterValue string
|
||||||
|
status string
|
||||||
|
}
|
||||||
|
var queries []querySpec
|
||||||
|
|
||||||
|
// Assignee queries for each identity variant in both open and hooked statuses
|
||||||
|
for _, identity := range identities {
|
||||||
|
for _, status := range []string{"open", "hooked"} {
|
||||||
|
queries = append(queries, querySpec{
|
||||||
|
filterFlag: "--assignee",
|
||||||
|
filterValue: identity,
|
||||||
|
status: status,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CC queries for each identity variant (open only)
|
||||||
|
for _, identity := range identities {
|
||||||
|
queries = append(queries, querySpec{
|
||||||
|
filterFlag: "--label",
|
||||||
|
filterValue: "cc:" + identity,
|
||||||
|
status: "open",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute all queries in parallel
|
||||||
|
results := make([]queryResult, len(queries))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(queries))
|
||||||
|
|
||||||
|
for i, q := range queries {
|
||||||
|
go func(idx int, spec querySpec) {
|
||||||
|
defer wg.Done()
|
||||||
|
msgs, err := m.queryMessages(beadsDir, spec.filterFlag, spec.filterValue, spec.status)
|
||||||
|
results[idx] = queryResult{messages: msgs, err: err}
|
||||||
|
}(i, q)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Collect results
|
||||||
seen := make(map[string]bool)
|
seen := make(map[string]bool)
|
||||||
var messages []*Message
|
var messages []*Message
|
||||||
var lastErr error
|
var lastErr error
|
||||||
anySucceeded := false
|
anySucceeded := false
|
||||||
|
|
||||||
// Get all identity variants to query (handles legacy vs normalized formats)
|
for _, r := range results {
|
||||||
identities := m.identityVariants()
|
if r.err != nil {
|
||||||
|
lastErr = r.err
|
||||||
// Query for each identity variant in both open and hooked statuses
|
|
||||||
for _, identity := range identities {
|
|
||||||
for _, status := range []string{"open", "hooked"} {
|
|
||||||
msgs, err := m.queryMessages(beadsDir, "--assignee", identity, status)
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
} else {
|
|
||||||
anySucceeded = true
|
|
||||||
for _, msg := range msgs {
|
|
||||||
if !seen[msg.ID] {
|
|
||||||
seen[msg.ID] = true
|
|
||||||
messages = append(messages, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query for CC'd messages (open only)
|
|
||||||
for _, identity := range identities {
|
|
||||||
ccMsgs, err := m.queryMessages(beadsDir, "--label", "cc:"+identity, "open")
|
|
||||||
if err != nil {
|
|
||||||
lastErr = err
|
|
||||||
} else {
|
} else {
|
||||||
anySucceeded = true
|
anySucceeded = true
|
||||||
for _, msg := range ccMsgs {
|
for _, msg := range r.messages {
|
||||||
if !seen[msg.ID] {
|
if !seen[msg.ID] {
|
||||||
seen[msg.ID] = true
|
seen[msg.ID] = true
|
||||||
messages = append(messages, msg)
|
messages = append(messages, msg)
|
||||||
@@ -250,17 +282,21 @@ func (m *Mailbox) listLegacy() ([]*Message, error) {
|
|||||||
|
|
||||||
// ListUnread returns unread (open) messages.
|
// ListUnread returns unread (open) messages.
|
||||||
func (m *Mailbox) ListUnread() ([]*Message, error) {
|
func (m *Mailbox) ListUnread() ([]*Message, error) {
|
||||||
all, err := m.List()
|
if m.legacy {
|
||||||
if err != nil {
|
all, err := m.List()
|
||||||
return nil, err
|
if err != nil {
|
||||||
}
|
return nil, err
|
||||||
var unread []*Message
|
|
||||||
for _, msg := range all {
|
|
||||||
if !msg.Read {
|
|
||||||
unread = append(unread, msg)
|
|
||||||
}
|
}
|
||||||
|
var unread []*Message
|
||||||
|
for _, msg := range all {
|
||||||
|
if !msg.Read {
|
||||||
|
unread = append(unread, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return unread, nil
|
||||||
}
|
}
|
||||||
return unread, nil
|
// For beads, inbox only returns open (unread) messages
|
||||||
|
return m.List()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a message by ID.
|
// Get returns a message by ID.
|
||||||
|
|||||||
Reference in New Issue
Block a user