Phase 2: Dual-write for edge schema consolidation (Decision 004)

When issue fields (replies_to, relates_to, duplicate_of, superseded_by)
are set during CreateIssue or UpdateIssue, we now ALSO create the
corresponding dependency edges. This enables gradual migration to
edge-based storage while maintaining backward compatibility.

Changes:
- createGraphEdgesFromIssueFields: handles CreateIssue dual-write
- createGraphEdgesFromUpdates: handles UpdateIssue dual-write
- Replies-to edges include thread_id for efficient thread queries
- Uses INSERT OR IGNORE to handle idempotency

The dual-write ensures both field and dependency stay in sync during
the migration period.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-18 01:21:15 -08:00
parent d390aa8834
commit 1d40b8f970

View File

@@ -11,6 +11,150 @@ import (
"github.com/steveyegge/beads/internal/types"
)
// createGraphEdgesFromIssueFields creates dependency edges for issue messaging/graph fields.
// This implements Phase 2 of Edge Schema Consolidation (Decision 004) - dual-write mode.
// When issue fields like RepliesTo, RelatesTo, etc. are set, we also create corresponding
// dependency edges. This ensures both the field and the dependency table stay in sync.
//
// For replies-to edges, we also compute and store the thread_id for efficient thread queries:
// - If parent has a thread_id, inherit it
// - If parent has no thread_id, use the parent's issue ID as the thread root
func createGraphEdgesFromIssueFields(ctx context.Context, conn *sql.Conn, issue *types.Issue, actor string) error {
now := time.Now()
// Helper to insert a dependency edge (no cycle check needed for new issues)
insertEdge := func(toID string, edgeType types.DependencyType, metadata, threadID string) error {
_, err := conn.ExecContext(ctx, `
INSERT OR IGNORE INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, issue.ID, toID, edgeType, now, actor, metadata, threadID)
return err
}
// RepliesTo -> replies-to dependency with thread_id
if issue.RepliesTo != "" {
// Compute thread_id: check if parent has a thread_id, otherwise use parent's ID
var parentThreadID string
err := conn.QueryRowContext(ctx, `
SELECT COALESCE(
(SELECT thread_id FROM dependencies WHERE issue_id = ? AND type = 'replies-to' AND thread_id != '' LIMIT 1),
?
)
`, issue.RepliesTo, issue.RepliesTo).Scan(&parentThreadID)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to get parent thread_id: %w", err)
}
if parentThreadID == "" {
parentThreadID = issue.RepliesTo // Use parent's ID as thread root
}
if err := insertEdge(issue.RepliesTo, types.DepRepliesTo, "{}", parentThreadID); err != nil {
return fmt.Errorf("failed to create replies-to edge: %w", err)
}
}
// RelatesTo -> relates-to dependencies
for _, relatedID := range issue.RelatesTo {
if relatedID != "" {
if err := insertEdge(relatedID, types.DepRelatesTo, "{}", ""); err != nil {
return fmt.Errorf("failed to create relates-to edge for %s: %w", relatedID, err)
}
}
}
// DuplicateOf -> duplicates dependency
if issue.DuplicateOf != "" {
if err := insertEdge(issue.DuplicateOf, types.DepDuplicates, "{}", ""); err != nil {
return fmt.Errorf("failed to create duplicates edge: %w", err)
}
}
// SupersededBy -> supersedes dependency (reversed: this issue is superseded BY another)
// So we create: this issue depends on the superseding issue
if issue.SupersededBy != "" {
if err := insertEdge(issue.SupersededBy, types.DepSupersedes, "{}", ""); err != nil {
return fmt.Errorf("failed to create supersedes edge: %w", err)
}
}
return nil
}
// createGraphEdgesFromUpdates creates dependency edges when graph fields are updated.
// This implements Phase 2 of Edge Schema Consolidation (Decision 004) for UpdateIssue.
func createGraphEdgesFromUpdates(ctx context.Context, tx *sql.Tx, issueID string, updates map[string]interface{}, actor string) error {
now := time.Now()
// Helper to insert a dependency edge
insertEdge := func(toID string, edgeType types.DependencyType, metadata, threadID string) error {
_, err := tx.ExecContext(ctx, `
INSERT OR IGNORE INTO dependencies (issue_id, depends_on_id, type, created_at, created_by, metadata, thread_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, issueID, toID, edgeType, now, actor, metadata, threadID)
return err
}
// RepliesTo -> replies-to dependency with thread_id
if repliesTo, ok := updates["replies_to"]; ok {
if replyID, isString := repliesTo.(string); isString && replyID != "" {
// Compute thread_id
var parentThreadID string
err := tx.QueryRowContext(ctx, `
SELECT COALESCE(
(SELECT thread_id FROM dependencies WHERE issue_id = ? AND type = 'replies-to' AND thread_id != '' LIMIT 1),
?
)
`, replyID, replyID).Scan(&parentThreadID)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to get parent thread_id: %w", err)
}
if parentThreadID == "" {
parentThreadID = replyID
}
if err := insertEdge(replyID, types.DepRepliesTo, "{}", parentThreadID); err != nil {
return fmt.Errorf("failed to create replies-to edge: %w", err)
}
}
}
// RelatesTo -> relates-to dependencies (JSON string array)
if relatesTo, ok := updates["relates_to"]; ok {
if relatesStr, isString := relatesTo.(string); isString && relatesStr != "" && relatesStr != "[]" {
var relatedIDs []string
if err := json.Unmarshal([]byte(relatesStr), &relatedIDs); err == nil {
for _, relatedID := range relatedIDs {
if relatedID != "" {
if err := insertEdge(relatedID, types.DepRelatesTo, "{}", ""); err != nil {
return fmt.Errorf("failed to create relates-to edge for %s: %w", relatedID, err)
}
}
}
}
}
}
// DuplicateOf -> duplicates dependency
if duplicateOf, ok := updates["duplicate_of"]; ok {
if dupID, isString := duplicateOf.(string); isString && dupID != "" {
if err := insertEdge(dupID, types.DepDuplicates, "{}", ""); err != nil {
return fmt.Errorf("failed to create duplicates edge: %w", err)
}
}
}
// SupersededBy -> supersedes dependency
if supersededBy, ok := updates["superseded_by"]; ok {
if supID, isString := supersededBy.(string); isString && supID != "" {
if err := insertEdge(supID, types.DepSupersedes, "{}", ""); err != nil {
return fmt.Errorf("failed to create supersedes edge: %w", err)
}
}
}
return nil
}
// parseNullableTimeString parses a nullable time string from database TEXT columns.
// The ncruces/go-sqlite3 driver only auto-converts TEXT→time.Time for columns declared
// as DATETIME/DATE/TIME/TIMESTAMP. For TEXT columns (like deleted_at), we must parse manually.
@@ -198,6 +342,11 @@ func (s *SQLiteStorage) CreateIssue(ctx context.Context, issue *types.Issue, act
return wrapDBError("record creation event", err)
}
// Create graph edges for messaging/graph fields (Phase 2: dual-write - Decision 004)
if err := createGraphEdgesFromIssueFields(ctx, conn, issue, actor); err != nil {
return wrapDBError("create graph edges from issue fields", err)
}
// Mark issue as dirty for incremental export
if err := markDirty(ctx, conn, issue.ID); err != nil {
return wrapDBError("mark issue dirty", err)
@@ -774,6 +923,11 @@ func (s *SQLiteStorage) UpdateIssue(ctx context.Context, id string, updates map[
return fmt.Errorf("failed to record event: %w", err)
}
// Create graph edges for messaging/graph fields (Phase 2: dual-write - Decision 004)
if err := createGraphEdgesFromUpdates(ctx, tx, id, updates, actor); err != nil {
return fmt.Errorf("failed to create graph edges from updates: %w", err)
}
// Mark issue as dirty for incremental export
_, err = tx.ExecContext(ctx, `
INSERT INTO dirty_issues (issue_id, marked_at)