diff --git a/internal/formula/advice.go b/internal/formula/advice.go index e707432f..cfd0d22f 100644 --- a/internal/formula/advice.go +++ b/internal/formula/advice.go @@ -196,10 +196,29 @@ func cloneStep(s *Step) *Step { clone.Labels = make([]string, len(s.Labels)) copy(clone.Labels, s.Labels) } + // Deep copy OnComplete if present (gt-8tmz.8) + if s.OnComplete != nil { + clone.OnComplete = cloneOnComplete(s.OnComplete) + } // Don't deep copy children here - ApplyAdvice handles that recursively return &clone } +// cloneOnComplete creates a deep copy of an OnCompleteSpec (gt-8tmz.8). +func cloneOnComplete(oc *OnCompleteSpec) *OnCompleteSpec { + if oc == nil { + return nil + } + clone := *oc + if len(oc.Vars) > 0 { + clone.Vars = make(map[string]string, len(oc.Vars)) + for k, v := range oc.Vars { + clone.Vars[k] = v + } + } + return &clone +} + // appendUnique appends an item to a slice if not already present. func appendUnique(slice []string, item string) []string { for _, s := range slice { diff --git a/internal/formula/parser_test.go b/internal/formula/parser_test.go index 1e411344..0998ab4c 100644 --- a/internal/formula/parser_test.go +++ b/internal/formula/parser_test.go @@ -777,3 +777,244 @@ func TestParse_NeedsAndWaitsFor(t *testing.T) { t.Errorf("Steps[2].WaitsFor = %q, want 'all-children'", formula.Steps[2].WaitsFor) } } + +// gt-8tmz.8: Tests for on_complete/for-each runtime expansion + +func TestParse_OnComplete(t *testing.T) { + jsonData := `{ + "formula": "mol-patrol", + "version": 1, + "type": "workflow", + "steps": [ + { + "id": "survey-workers", + "title": "Survey workers", + "on_complete": { + "for_each": "output.polecats", + "bond": "mol-polecat-arm", + "vars": { + "polecat_name": "{item.name}", + "rig": "{item.rig}" + }, + "parallel": true + } + }, + { + "id": "aggregate", + "title": "Aggregate results", + "needs": ["survey-workers"], + "waits_for": "all-children" + } + ] +}` + p := NewParser() + formula, err := p.Parse([]byte(jsonData)) + if err != nil { + t.Fatalf("Parse failed: %v", err) + } + + // Validate parsed formula + if err := formula.Validate(); err != nil { + t.Errorf("Validate failed: %v", err) + } + + // Check on_complete field + oc := formula.Steps[0].OnComplete + if oc == nil { + t.Fatal("Steps[0].OnComplete is nil") + } + if oc.ForEach != "output.polecats" { + t.Errorf("ForEach = %q, want 'output.polecats'", oc.ForEach) + } + if oc.Bond != "mol-polecat-arm" { + t.Errorf("Bond = %q, want 'mol-polecat-arm'", oc.Bond) + } + if len(oc.Vars) != 2 { + t.Errorf("len(Vars) = %d, want 2", len(oc.Vars)) + } + if oc.Vars["polecat_name"] != "{item.name}" { + t.Errorf("Vars[polecat_name] = %q, want '{item.name}'", oc.Vars["polecat_name"]) + } + if !oc.Parallel { + t.Error("Parallel should be true") + } +} + +func TestValidate_OnComplete_Valid(t *testing.T) { + formula := &Formula{ + Formula: "mol-valid", + Version: 1, + Type: TypeWorkflow, + Steps: []*Step{ + { + ID: "survey", + Title: "Survey", + OnComplete: &OnCompleteSpec{ + ForEach: "output.items", + Bond: "mol-item", + }, + }, + }, + } + + if err := formula.Validate(); err != nil { + t.Errorf("Validate failed for valid on_complete: %v", err) + } +} + +func TestValidate_OnComplete_MissingBond(t *testing.T) { + formula := &Formula{ + Formula: "mol-invalid", + Version: 1, + Type: TypeWorkflow, + Steps: []*Step{ + { + ID: "survey", + Title: "Survey", + OnComplete: &OnCompleteSpec{ + ForEach: "output.items", + // Bond is missing + }, + }, + }, + } + + err := formula.Validate() + if err == nil { + t.Error("expected validation error for missing bond") + } + if !strings.Contains(err.Error(), "bond is required") { + t.Errorf("expected 'bond is required' error, got: %v", err) + } +} + +func TestValidate_OnComplete_MissingForEach(t *testing.T) { + formula := &Formula{ + Formula: "mol-invalid", + Version: 1, + Type: TypeWorkflow, + Steps: []*Step{ + { + ID: "survey", + Title: "Survey", + OnComplete: &OnCompleteSpec{ + Bond: "mol-item", + // ForEach is missing + }, + }, + }, + } + + err := formula.Validate() + if err == nil { + t.Error("expected validation error for missing for_each") + } + if !strings.Contains(err.Error(), "for_each is required") { + t.Errorf("expected 'for_each is required' error, got: %v", err) + } +} + +func TestValidate_OnComplete_InvalidForEachPath(t *testing.T) { + formula := &Formula{ + Formula: "mol-invalid", + Version: 1, + Type: TypeWorkflow, + Steps: []*Step{ + { + ID: "survey", + Title: "Survey", + OnComplete: &OnCompleteSpec{ + ForEach: "items", // Should start with "output." + Bond: "mol-item", + }, + }, + }, + } + + err := formula.Validate() + if err == nil { + t.Error("expected validation error for invalid for_each path") + } + if !strings.Contains(err.Error(), "must start with 'output.'") { + t.Errorf("expected 'must start with output.' error, got: %v", err) + } +} + +func TestValidate_OnComplete_ParallelAndSequential(t *testing.T) { + formula := &Formula{ + Formula: "mol-invalid", + Version: 1, + Type: TypeWorkflow, + Steps: []*Step{ + { + ID: "survey", + Title: "Survey", + OnComplete: &OnCompleteSpec{ + ForEach: "output.items", + Bond: "mol-item", + Parallel: true, + Sequential: true, // Can't have both + }, + }, + }, + } + + err := formula.Validate() + if err == nil { + t.Error("expected validation error for parallel + sequential") + } + if !strings.Contains(err.Error(), "cannot set both parallel and sequential") { + t.Errorf("expected 'cannot set both' error, got: %v", err) + } +} + +func TestValidate_OnComplete_Sequential(t *testing.T) { + formula := &Formula{ + Formula: "mol-valid", + Version: 1, + Type: TypeWorkflow, + Steps: []*Step{ + { + ID: "process-queue", + Title: "Process queue", + OnComplete: &OnCompleteSpec{ + ForEach: "output.branches", + Bond: "mol-merge", + Sequential: true, + }, + }, + }, + } + + if err := formula.Validate(); err != nil { + t.Errorf("Validate failed for sequential on_complete: %v", err) + } +} + +func TestValidate_OnComplete_InChildren(t *testing.T) { + formula := &Formula{ + Formula: "mol-valid", + Version: 1, + Type: TypeWorkflow, + Steps: []*Step{ + { + ID: "parent", + Title: "Parent", + Children: []*Step{ + { + ID: "child", + Title: "Child", + OnComplete: &OnCompleteSpec{ + ForEach: "output.items", + Bond: "mol-item", + }, + }, + }, + }, + }, + } + + if err := formula.Validate(); err != nil { + t.Errorf("Validate failed for on_complete in child: %v", err) + } +} diff --git a/internal/formula/types.go b/internal/formula/types.go index cf7917ba..420b4a2e 100644 --- a/internal/formula/types.go +++ b/internal/formula/types.go @@ -184,6 +184,10 @@ type Step struct { // Loop defines iteration for this step (gt-8tmz.4). // When set, the step becomes a container that expands its body. Loop *LoopSpec `json:"loop,omitempty"` + + // OnComplete defines actions triggered when this step completes (gt-8tmz.8). + // Used for runtime expansion over step output (the for-each construct). + OnComplete *OnCompleteSpec `json:"on_complete,omitempty"` } // Gate defines an async wait condition (integrates with bd-udsi). @@ -218,6 +222,46 @@ type LoopSpec struct { Body []*Step `json:"body"` } +// OnCompleteSpec defines actions triggered when a step completes (gt-8tmz.8). +// Used for runtime expansion over step output (the for-each construct). +// +// Example YAML: +// +// step: survey-workers +// on_complete: +// for_each: output.polecats +// bond: mol-polecat-arm +// vars: +// polecat_name: "{item.name}" +// rig: "{item.rig}" +// parallel: true +type OnCompleteSpec struct { + // ForEach is the path to the iterable collection in step output. + // Format: "output." or "output.." + // The collection must be an array at runtime. + ForEach string `json:"for_each,omitempty"` + + // Bond is the formula to instantiate for each item. + // A new molecule is created for each element in the ForEach collection. + Bond string `json:"bond,omitempty"` + + // Vars are variable bindings for each iteration. + // Supports placeholders: + // - {item} - the current item value (for primitives) + // - {item.field} - a field from the current item (for objects) + // - {index} - the zero-based iteration index + Vars map[string]string `json:"vars,omitempty"` + + // Parallel runs all bonded molecules concurrently (default behavior). + // Set to true to make this explicit. + Parallel bool `json:"parallel,omitempty"` + + // Sequential runs bonded molecules one at a time. + // Each molecule starts only after the previous one completes. + // Mutually exclusive with Parallel. + Sequential bool `json:"sequential,omitempty"` +} + // BranchRule defines parallel execution paths that rejoin (gt-8tmz.4). // Creates a fork-join pattern: from -> [parallel steps] -> join. type BranchRule struct { @@ -474,6 +518,10 @@ func (f *Formula) Validate() error { errs = append(errs, fmt.Sprintf("steps[%d] (%s): waits_for has invalid value %q (must be all-children or any-children)", i, step.ID, step.WaitsFor)) } } + // Validate on_complete field (gt-8tmz.8) - runtime expansion + if step.OnComplete != nil { + validateOnComplete(step.OnComplete, &errs, fmt.Sprintf("steps[%d] (%s)", i, step.ID)) + } // Validate children's depends_on and needs recursively validateChildDependsOn(step.Children, stepIDLocations, &errs, fmt.Sprintf("steps[%d]", i)) } @@ -566,10 +614,37 @@ func validateChildDependsOn(children []*Step, idLocations map[string]string, err *errs = append(*errs, fmt.Sprintf("%s (%s): waits_for has invalid value %q (must be all-children or any-children)", childPrefix, child.ID, child.WaitsFor)) } } + // Validate on_complete field (gt-8tmz.8) + if child.OnComplete != nil { + validateOnComplete(child.OnComplete, errs, fmt.Sprintf("%s (%s)", childPrefix, child.ID)) + } validateChildDependsOn(child.Children, idLocations, errs, childPrefix) } } +// validateOnComplete validates an OnCompleteSpec (gt-8tmz.8). +func validateOnComplete(oc *OnCompleteSpec, errs *[]string, prefix string) { + // Check that for_each and bond are both present or both absent + if oc.ForEach != "" && oc.Bond == "" { + *errs = append(*errs, fmt.Sprintf("%s.on_complete: bond is required when for_each is set", prefix)) + } + if oc.ForEach == "" && oc.Bond != "" { + *errs = append(*errs, fmt.Sprintf("%s.on_complete: for_each is required when bond is set", prefix)) + } + + // Validate for_each path format + if oc.ForEach != "" { + if !strings.HasPrefix(oc.ForEach, "output.") { + *errs = append(*errs, fmt.Sprintf("%s.on_complete: for_each must start with 'output.' (got %q)", prefix, oc.ForEach)) + } + } + + // Check parallel and sequential are mutually exclusive + if oc.Parallel && oc.Sequential { + *errs = append(*errs, fmt.Sprintf("%s.on_complete: cannot set both parallel and sequential", prefix)) + } +} + // GetRequiredVars returns the names of all required variables. func (f *Formula) GetRequiredVars() []string { var required []string