From 1d40b8f970f160e458e42e9a4eb5541a0dbf67a0 Mon Sep 17 00:00:00 2001 From: Steve Yegge Date: Thu, 18 Dec 2025 01:21:15 -0800 Subject: [PATCH] Phase 2: Dual-write for edge schema consolidation (Decision 004) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When issue fields (replies_to, relates_to, duplicate_of, superseded_by) are set during CreateIssue or UpdateIssue, we now ALSO create the corresponding dependency edges. This enables gradual migration to edge-based storage while maintaining backward compatibility. Changes: - createGraphEdgesFromIssueFields: handles CreateIssue dual-write - createGraphEdgesFromUpdates: handles UpdateIssue dual-write - Replies-to edges include thread_id for efficient thread queries - Uses INSERT OR IGNORE to handle idempotency The dual-write ensures both field and dependency stay in sync during the migration period. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- internal/storage/sqlite/queries.go | 154 +++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/internal/storage/sqlite/queries.go b/internal/storage/sqlite/queries.go index 2a08d9b9..aa4fb4fc 100644 --- a/internal/storage/sqlite/queries.go +++ b/internal/storage/sqlite/queries.go @@ -11,6 +11,150 @@ import ( "github.com/steveyegge/beads/internal/types" ) +// createGraphEdgesFromIssueFields creates dependency edges for issue messaging/graph fields. +// This implements Phase 2 of Edge Schema Consolidation (Decision 004) - dual-write mode. +// When issue fields like RepliesTo, RelatesTo, etc. are set, we also create corresponding +// dependency edges. This ensures both the field and the dependency table stay in sync. +// +// For replies-to edges, we also compute and store the thread_id for efficient thread queries: +// - If parent has a thread_id, inherit it +// - If parent has no thread_id, use the parent's issue ID as the thread root +func createGraphEdgesFromIssueFields(ctx context.Context, conn *sql.Conn, issue *types.Issue, actor string) error { + now := time.Now() + + // Helper to insert a dependency edge (no cycle check needed for new issues) + insertEdge := func(toID string, edgeType types.DependencyType, metadata, threadID string) error { + _, err := conn.ExecContext(ctx, ` + INSERT OR IGNORE INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, issue.ID, toID, edgeType, now, actor, metadata, threadID) + return err + } + + // RepliesTo -> replies-to dependency with thread_id + if issue.RepliesTo != "" { + // Compute thread_id: check if parent has a thread_id, otherwise use parent's ID + var parentThreadID string + err := conn.QueryRowContext(ctx, ` + SELECT COALESCE( + (SELECT thread_id FROM dependencies WHERE issue_id = ? AND type = 'replies-to' AND thread_id != '' LIMIT 1), + ? + ) + `, issue.RepliesTo, issue.RepliesTo).Scan(&parentThreadID) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to get parent thread_id: %w", err) + } + if parentThreadID == "" { + parentThreadID = issue.RepliesTo // Use parent's ID as thread root + } + + if err := insertEdge(issue.RepliesTo, types.DepRepliesTo, "{}", parentThreadID); err != nil { + return fmt.Errorf("failed to create replies-to edge: %w", err) + } + } + + // RelatesTo -> relates-to dependencies + for _, relatedID := range issue.RelatesTo { + if relatedID != "" { + if err := insertEdge(relatedID, types.DepRelatesTo, "{}", ""); err != nil { + return fmt.Errorf("failed to create relates-to edge for %s: %w", relatedID, err) + } + } + } + + // DuplicateOf -> duplicates dependency + if issue.DuplicateOf != "" { + if err := insertEdge(issue.DuplicateOf, types.DepDuplicates, "{}", ""); err != nil { + return fmt.Errorf("failed to create duplicates edge: %w", err) + } + } + + // SupersededBy -> supersedes dependency (reversed: this issue is superseded BY another) + // So we create: this issue depends on the superseding issue + if issue.SupersededBy != "" { + if err := insertEdge(issue.SupersededBy, types.DepSupersedes, "{}", ""); err != nil { + return fmt.Errorf("failed to create supersedes edge: %w", err) + } + } + + return nil +} + +// createGraphEdgesFromUpdates creates dependency edges when graph fields are updated. +// This implements Phase 2 of Edge Schema Consolidation (Decision 004) for UpdateIssue. +func createGraphEdgesFromUpdates(ctx context.Context, tx *sql.Tx, issueID string, updates map[string]interface{}, actor string) error { + now := time.Now() + + // Helper to insert a dependency edge + insertEdge := func(toID string, edgeType types.DependencyType, metadata, threadID string) error { + _, err := tx.ExecContext(ctx, ` + INSERT OR IGNORE INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, issueID, toID, edgeType, now, actor, metadata, threadID) + return err + } + + // RepliesTo -> replies-to dependency with thread_id + if repliesTo, ok := updates["replies_to"]; ok { + if replyID, isString := repliesTo.(string); isString && replyID != "" { + // Compute thread_id + var parentThreadID string + err := tx.QueryRowContext(ctx, ` + SELECT COALESCE( + (SELECT thread_id FROM dependencies WHERE issue_id = ? AND type = 'replies-to' AND thread_id != '' LIMIT 1), + ? + ) + `, replyID, replyID).Scan(&parentThreadID) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to get parent thread_id: %w", err) + } + if parentThreadID == "" { + parentThreadID = replyID + } + + if err := insertEdge(replyID, types.DepRepliesTo, "{}", parentThreadID); err != nil { + return fmt.Errorf("failed to create replies-to edge: %w", err) + } + } + } + + // RelatesTo -> relates-to dependencies (JSON string array) + if relatesTo, ok := updates["relates_to"]; ok { + if relatesStr, isString := relatesTo.(string); isString && relatesStr != "" && relatesStr != "[]" { + var relatedIDs []string + if err := json.Unmarshal([]byte(relatesStr), &relatedIDs); err == nil { + for _, relatedID := range relatedIDs { + if relatedID != "" { + if err := insertEdge(relatedID, types.DepRelatesTo, "{}", ""); err != nil { + return fmt.Errorf("failed to create relates-to edge for %s: %w", relatedID, err) + } + } + } + } + } + } + + // DuplicateOf -> duplicates dependency + if duplicateOf, ok := updates["duplicate_of"]; ok { + if dupID, isString := duplicateOf.(string); isString && dupID != "" { + if err := insertEdge(dupID, types.DepDuplicates, "{}", ""); err != nil { + return fmt.Errorf("failed to create duplicates edge: %w", err) + } + } + } + + // SupersededBy -> supersedes dependency + if supersededBy, ok := updates["superseded_by"]; ok { + if supID, isString := supersededBy.(string); isString && supID != "" { + if err := insertEdge(supID, types.DepSupersedes, "{}", ""); err != nil { + return fmt.Errorf("failed to create supersedes edge: %w", err) + } + } + } + + return nil +} + // parseNullableTimeString parses a nullable time string from database TEXT columns. // The ncruces/go-sqlite3 driver only auto-converts TEXT→time.Time for columns declared // as DATETIME/DATE/TIME/TIMESTAMP. For TEXT columns (like deleted_at), we must parse manually. @@ -198,6 +342,11 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act return wrapDBError("record creation event", err) } + // Create graph edges for messaging/graph fields (Phase 2: dual-write - Decision 004) + if err := createGraphEdgesFromIssueFields(ctx, conn, issue, actor); err != nil { + return wrapDBError("create graph edges from issue fields", err) + } + // Mark issue as dirty for incremental export if err := markDirty(ctx, conn, issue.ID); err != nil { return wrapDBError("mark issue dirty", err) @@ -774,6 +923,11 @@ func (s *SQLiteStorage) UpdateIssue(ctx context.Context, id string, updates map[ return fmt.Errorf("failed to record event: %w", err) } + // Create graph edges for messaging/graph fields (Phase 2: dual-write - Decision 004) + if err := createGraphEdgesFromUpdates(ctx, tx, id, updates, actor); err != nil { + return fmt.Errorf("failed to create graph edges from updates: %w", err) + } + // Mark issue as dirty for incremental export _, err = tx.ExecContext(ctx, ` INSERT INTO dirty_issues (issue_id, marked_at)