fix(dolt): Optimize N+1 queries and add proper test timeouts
- Add batch query optimization to avoid N+1 queries in scanIssueIDs - Create GetIssuesByIDs helper to fetch multiple issues in single query - Add scanIssueRow helper to scan issue data from rows iterator - Add proper timeout contexts to all Dolt tests using testContext helper The embedded Dolt driver is slow for repeated queries. Replacing N+1 GetIssue calls with a single IN clause query fixes the 30s+ timeouts in TestDoltStoreDependencies, TestDoltStoreSearch, and TestDoltStoreGetReadyWork. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
committed by
gastown/crew/dennis
parent
b51c0101ba
commit
99d6592207
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/steveyegge/beads/internal/types"
|
||||
)
|
||||
@@ -449,21 +450,232 @@ func (s *DoltStore) GetNewlyUnblockedByClose(ctx context.Context, closedIssueID
|
||||
// Helper functions
|
||||
|
||||
func (s *DoltStore) scanIssueIDs(ctx context.Context, rows *sql.Rows) ([]*types.Issue, error) {
|
||||
var issues []*types.Issue
|
||||
// First, collect all IDs
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan issue id: %w", err)
|
||||
}
|
||||
issue, err := s.GetIssue(ctx, id)
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Fetch all issues in a single batch query
|
||||
return s.GetIssuesByIDs(ctx, ids)
|
||||
}
|
||||
|
||||
// GetIssuesByIDs retrieves multiple issues by ID in a single query to avoid N+1 performance issues
|
||||
func (s *DoltStore) GetIssuesByIDs(ctx context.Context, ids []string) ([]*types.Issue, error) {
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Build IN clause
|
||||
placeholders := make([]string, len(ids))
|
||||
args := make([]interface{}, len(ids))
|
||||
for i, id := range ids {
|
||||
placeholders[i] = "?"
|
||||
args[i] = id
|
||||
}
|
||||
|
||||
// nolint:gosec // G201: placeholders contains only ? markers, actual values passed via args
|
||||
query := fmt.Sprintf(`
|
||||
SELECT id, content_hash, title, description, design, acceptance_criteria, notes,
|
||||
status, priority, issue_type, assignee, estimated_minutes,
|
||||
created_at, created_by, owner, updated_at, closed_at, external_ref,
|
||||
compaction_level, compacted_at, compacted_at_commit, original_size, source_repo, close_reason,
|
||||
deleted_at, deleted_by, delete_reason, original_type,
|
||||
sender, ephemeral, pinned, is_template, crystallizes,
|
||||
await_type, await_id, timeout_ns, waiters,
|
||||
hook_bead, role_bead, agent_state, last_activity, role_type, rig, mol_type,
|
||||
event_kind, actor, target, payload,
|
||||
due_at, defer_until,
|
||||
quality_score, work_type, source_system
|
||||
FROM issues
|
||||
WHERE id IN (%s)
|
||||
`, strings.Join(placeholders, ","))
|
||||
|
||||
queryRows, err := s.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get issues by IDs: %w", err)
|
||||
}
|
||||
defer queryRows.Close()
|
||||
|
||||
var issues []*types.Issue
|
||||
for queryRows.Next() {
|
||||
issue, err := scanIssueRow(queryRows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if issue != nil {
|
||||
issues = append(issues, issue)
|
||||
}
|
||||
issues = append(issues, issue)
|
||||
}
|
||||
return issues, rows.Err()
|
||||
|
||||
return issues, queryRows.Err()
|
||||
}
|
||||
|
||||
// scanIssueRow scans a single issue from a rows result
|
||||
func scanIssueRow(rows *sql.Rows) (*types.Issue, error) {
|
||||
var issue types.Issue
|
||||
var closedAt, compactedAt, deletedAt, lastActivity, dueAt, deferUntil sql.NullTime
|
||||
var estimatedMinutes, originalSize, timeoutNs sql.NullInt64
|
||||
var assignee, externalRef, compactedAtCommit, owner sql.NullString
|
||||
var contentHash, sourceRepo, closeReason, deletedBy, deleteReason, originalType sql.NullString
|
||||
var workType, sourceSystem sql.NullString
|
||||
var sender, molType, eventKind, actor, target, payload sql.NullString
|
||||
var awaitType, awaitID, waiters sql.NullString
|
||||
var hookBead, roleBead, agentState, roleType, rig sql.NullString
|
||||
var ephemeral, pinned, isTemplate, crystallizes sql.NullInt64
|
||||
var qualityScore sql.NullFloat64
|
||||
|
||||
if err := rows.Scan(
|
||||
&issue.ID, &contentHash, &issue.Title, &issue.Description, &issue.Design,
|
||||
&issue.AcceptanceCriteria, &issue.Notes, &issue.Status,
|
||||
&issue.Priority, &issue.IssueType, &assignee, &estimatedMinutes,
|
||||
&issue.CreatedAt, &issue.CreatedBy, &owner, &issue.UpdatedAt, &closedAt, &externalRef,
|
||||
&issue.CompactionLevel, &compactedAt, &compactedAtCommit, &originalSize, &sourceRepo, &closeReason,
|
||||
&deletedAt, &deletedBy, &deleteReason, &originalType,
|
||||
&sender, &ephemeral, &pinned, &isTemplate, &crystallizes,
|
||||
&awaitType, &awaitID, &timeoutNs, &waiters,
|
||||
&hookBead, &roleBead, &agentState, &lastActivity, &roleType, &rig, &molType,
|
||||
&eventKind, &actor, &target, &payload,
|
||||
&dueAt, &deferUntil,
|
||||
&qualityScore, &workType, &sourceSystem,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan issue row: %w", err)
|
||||
}
|
||||
|
||||
// Map nullable fields
|
||||
if contentHash.Valid {
|
||||
issue.ContentHash = contentHash.String
|
||||
}
|
||||
if closedAt.Valid {
|
||||
issue.ClosedAt = &closedAt.Time
|
||||
}
|
||||
if estimatedMinutes.Valid {
|
||||
mins := int(estimatedMinutes.Int64)
|
||||
issue.EstimatedMinutes = &mins
|
||||
}
|
||||
if assignee.Valid {
|
||||
issue.Assignee = assignee.String
|
||||
}
|
||||
if owner.Valid {
|
||||
issue.Owner = owner.String
|
||||
}
|
||||
if externalRef.Valid {
|
||||
issue.ExternalRef = &externalRef.String
|
||||
}
|
||||
if compactedAt.Valid {
|
||||
issue.CompactedAt = &compactedAt.Time
|
||||
}
|
||||
if compactedAtCommit.Valid {
|
||||
issue.CompactedAtCommit = &compactedAtCommit.String
|
||||
}
|
||||
if originalSize.Valid {
|
||||
issue.OriginalSize = int(originalSize.Int64)
|
||||
}
|
||||
if sourceRepo.Valid {
|
||||
issue.SourceRepo = sourceRepo.String
|
||||
}
|
||||
if closeReason.Valid {
|
||||
issue.CloseReason = closeReason.String
|
||||
}
|
||||
if deletedAt.Valid {
|
||||
issue.DeletedAt = &deletedAt.Time
|
||||
}
|
||||
if deletedBy.Valid {
|
||||
issue.DeletedBy = deletedBy.String
|
||||
}
|
||||
if deleteReason.Valid {
|
||||
issue.DeleteReason = deleteReason.String
|
||||
}
|
||||
if originalType.Valid {
|
||||
issue.OriginalType = originalType.String
|
||||
}
|
||||
if sender.Valid {
|
||||
issue.Sender = sender.String
|
||||
}
|
||||
if ephemeral.Valid && ephemeral.Int64 != 0 {
|
||||
issue.Ephemeral = true
|
||||
}
|
||||
if pinned.Valid && pinned.Int64 != 0 {
|
||||
issue.Pinned = true
|
||||
}
|
||||
if isTemplate.Valid && isTemplate.Int64 != 0 {
|
||||
issue.IsTemplate = true
|
||||
}
|
||||
if crystallizes.Valid && crystallizes.Int64 != 0 {
|
||||
issue.Crystallizes = true
|
||||
}
|
||||
if awaitType.Valid {
|
||||
issue.AwaitType = awaitType.String
|
||||
}
|
||||
if awaitID.Valid {
|
||||
issue.AwaitID = awaitID.String
|
||||
}
|
||||
if timeoutNs.Valid {
|
||||
issue.Timeout = time.Duration(timeoutNs.Int64)
|
||||
}
|
||||
if waiters.Valid && waiters.String != "" {
|
||||
issue.Waiters = parseJSONStringArray(waiters.String)
|
||||
}
|
||||
if hookBead.Valid {
|
||||
issue.HookBead = hookBead.String
|
||||
}
|
||||
if roleBead.Valid {
|
||||
issue.RoleBead = roleBead.String
|
||||
}
|
||||
if agentState.Valid {
|
||||
issue.AgentState = types.AgentState(agentState.String)
|
||||
}
|
||||
if lastActivity.Valid {
|
||||
issue.LastActivity = &lastActivity.Time
|
||||
}
|
||||
if roleType.Valid {
|
||||
issue.RoleType = roleType.String
|
||||
}
|
||||
if rig.Valid {
|
||||
issue.Rig = rig.String
|
||||
}
|
||||
if molType.Valid {
|
||||
issue.MolType = types.MolType(molType.String)
|
||||
}
|
||||
if eventKind.Valid {
|
||||
issue.EventKind = eventKind.String
|
||||
}
|
||||
if actor.Valid {
|
||||
issue.Actor = actor.String
|
||||
}
|
||||
if target.Valid {
|
||||
issue.Target = target.String
|
||||
}
|
||||
if payload.Valid {
|
||||
issue.Payload = payload.String
|
||||
}
|
||||
if dueAt.Valid {
|
||||
issue.DueAt = &dueAt.Time
|
||||
}
|
||||
if deferUntil.Valid {
|
||||
issue.DeferUntil = &deferUntil.Time
|
||||
}
|
||||
if qualityScore.Valid {
|
||||
qs := float32(qualityScore.Float64)
|
||||
issue.QualityScore = &qs
|
||||
}
|
||||
if workType.Valid {
|
||||
issue.WorkType = types.WorkType(workType.String)
|
||||
}
|
||||
if sourceSystem.Valid {
|
||||
issue.SourceSystem = sourceSystem.String
|
||||
}
|
||||
|
||||
return &issue, nil
|
||||
}
|
||||
|
||||
func scanDependencyRows(rows *sql.Rows) ([]*types.Dependency, error) {
|
||||
|
||||
@@ -109,7 +109,8 @@ func TestDoltStoreConfig(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Test SetConfig
|
||||
if err := store.SetConfig(ctx, "test_key", "test_value"); err != nil {
|
||||
@@ -151,7 +152,8 @@ func TestDoltStoreIssue(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue
|
||||
issue := &types.Issue{
|
||||
@@ -188,7 +190,8 @@ func TestDoltStoreIssueUpdate(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue
|
||||
issue := &types.Issue{
|
||||
@@ -234,7 +237,8 @@ func TestDoltStoreIssueClose(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue
|
||||
issue := &types.Issue{
|
||||
@@ -271,7 +275,8 @@ func TestDoltStoreLabels(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue
|
||||
issue := &types.Issue{
|
||||
@@ -412,7 +417,8 @@ func TestDoltStoreSearch(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create multiple issues
|
||||
issues := []*types.Issue{
|
||||
@@ -482,7 +488,8 @@ func TestDoltStoreCreateIssues(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create multiple issues in batch
|
||||
issues := []*types.Issue{
|
||||
@@ -527,7 +534,8 @@ func TestDoltStoreComments(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue
|
||||
issue := &types.Issue{
|
||||
@@ -574,7 +582,8 @@ func TestDoltStoreEvents(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue (this creates a creation event)
|
||||
issue := &types.Issue{
|
||||
@@ -609,7 +618,8 @@ func TestDoltStoreDeleteIssue(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue
|
||||
issue := &types.Issue{
|
||||
@@ -650,7 +660,8 @@ func TestDoltStoreDirtyTracking(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create an issue (marks it dirty)
|
||||
issue := &types.Issue{
|
||||
@@ -703,7 +714,8 @@ func TestDoltStoreStatistics(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create some issues
|
||||
issues := []*types.Issue{
|
||||
@@ -793,7 +805,8 @@ func TestDoltStoreGetReadyWork(t *testing.T) {
|
||||
store, cleanup := setupTestStore(t)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := testContext(t)
|
||||
defer cancel()
|
||||
|
||||
// Create issues: one blocked, one ready
|
||||
blocker := &types.Issue{
|
||||
|
||||
Reference in New Issue
Block a user