feat: Add on_complete/for-each runtime expansion types (gt-8tmz.8)

Implements the schema and validation for runtime dynamic expansion:

- Add OnCompleteSpec type for step completion triggers
- Add on_complete field to Step struct
- Validate for_each path format (must start with "output.")
- Validate bond is required with for_each (and vice versa)
- Validate parallel and sequential are mutually exclusive
- Add cloneOnComplete for proper step cloning
- Add comprehensive tests for parsing and validation

The runtime executor (in gastown) will interpret these fields to:
- Bond N molecules when step completes based on output.collection
- Run bonded molecules in parallel or sequential order
- Pass item/index context via vars substitution

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Steve Yegge
2025-12-25 16:07:57 -08:00
parent 8b5168f1a3
commit a42fac8cb9
3 changed files with 335 additions and 0 deletions

View File

@@ -196,10 +196,29 @@ func cloneStep(s *Step) *Step {
clone.Labels = make([]string, len(s.Labels))
copy(clone.Labels, s.Labels)
}
// Deep copy OnComplete if present (gt-8tmz.8)
if s.OnComplete != nil {
clone.OnComplete = cloneOnComplete(s.OnComplete)
}
// Don't deep copy children here - ApplyAdvice handles that recursively
return &clone
}
// cloneOnComplete creates a deep copy of an OnCompleteSpec (gt-8tmz.8).
func cloneOnComplete(oc *OnCompleteSpec) *OnCompleteSpec {
if oc == nil {
return nil
}
clone := *oc
if len(oc.Vars) > 0 {
clone.Vars = make(map[string]string, len(oc.Vars))
for k, v := range oc.Vars {
clone.Vars[k] = v
}
}
return &clone
}
// appendUnique appends an item to a slice if not already present.
func appendUnique(slice []string, item string) []string {
for _, s := range slice {

View File

@@ -777,3 +777,244 @@ func TestParse_NeedsAndWaitsFor(t *testing.T) {
t.Errorf("Steps[2].WaitsFor = %q, want 'all-children'", formula.Steps[2].WaitsFor)
}
}
// gt-8tmz.8: Tests for on_complete/for-each runtime expansion
func TestParse_OnComplete(t *testing.T) {
jsonData := `{
"formula": "mol-patrol",
"version": 1,
"type": "workflow",
"steps": [
{
"id": "survey-workers",
"title": "Survey workers",
"on_complete": {
"for_each": "output.polecats",
"bond": "mol-polecat-arm",
"vars": {
"polecat_name": "{item.name}",
"rig": "{item.rig}"
},
"parallel": true
}
},
{
"id": "aggregate",
"title": "Aggregate results",
"needs": ["survey-workers"],
"waits_for": "all-children"
}
]
}`
p := NewParser()
formula, err := p.Parse([]byte(jsonData))
if err != nil {
t.Fatalf("Parse failed: %v", err)
}
// Validate parsed formula
if err := formula.Validate(); err != nil {
t.Errorf("Validate failed: %v", err)
}
// Check on_complete field
oc := formula.Steps[0].OnComplete
if oc == nil {
t.Fatal("Steps[0].OnComplete is nil")
}
if oc.ForEach != "output.polecats" {
t.Errorf("ForEach = %q, want 'output.polecats'", oc.ForEach)
}
if oc.Bond != "mol-polecat-arm" {
t.Errorf("Bond = %q, want 'mol-polecat-arm'", oc.Bond)
}
if len(oc.Vars) != 2 {
t.Errorf("len(Vars) = %d, want 2", len(oc.Vars))
}
if oc.Vars["polecat_name"] != "{item.name}" {
t.Errorf("Vars[polecat_name] = %q, want '{item.name}'", oc.Vars["polecat_name"])
}
if !oc.Parallel {
t.Error("Parallel should be true")
}
}
func TestValidate_OnComplete_Valid(t *testing.T) {
formula := &Formula{
Formula: "mol-valid",
Version: 1,
Type: TypeWorkflow,
Steps: []*Step{
{
ID: "survey",
Title: "Survey",
OnComplete: &OnCompleteSpec{
ForEach: "output.items",
Bond: "mol-item",
},
},
},
}
if err := formula.Validate(); err != nil {
t.Errorf("Validate failed for valid on_complete: %v", err)
}
}
func TestValidate_OnComplete_MissingBond(t *testing.T) {
formula := &Formula{
Formula: "mol-invalid",
Version: 1,
Type: TypeWorkflow,
Steps: []*Step{
{
ID: "survey",
Title: "Survey",
OnComplete: &OnCompleteSpec{
ForEach: "output.items",
// Bond is missing
},
},
},
}
err := formula.Validate()
if err == nil {
t.Error("expected validation error for missing bond")
}
if !strings.Contains(err.Error(), "bond is required") {
t.Errorf("expected 'bond is required' error, got: %v", err)
}
}
func TestValidate_OnComplete_MissingForEach(t *testing.T) {
formula := &Formula{
Formula: "mol-invalid",
Version: 1,
Type: TypeWorkflow,
Steps: []*Step{
{
ID: "survey",
Title: "Survey",
OnComplete: &OnCompleteSpec{
Bond: "mol-item",
// ForEach is missing
},
},
},
}
err := formula.Validate()
if err == nil {
t.Error("expected validation error for missing for_each")
}
if !strings.Contains(err.Error(), "for_each is required") {
t.Errorf("expected 'for_each is required' error, got: %v", err)
}
}
func TestValidate_OnComplete_InvalidForEachPath(t *testing.T) {
formula := &Formula{
Formula: "mol-invalid",
Version: 1,
Type: TypeWorkflow,
Steps: []*Step{
{
ID: "survey",
Title: "Survey",
OnComplete: &OnCompleteSpec{
ForEach: "items", // Should start with "output."
Bond: "mol-item",
},
},
},
}
err := formula.Validate()
if err == nil {
t.Error("expected validation error for invalid for_each path")
}
if !strings.Contains(err.Error(), "must start with 'output.'") {
t.Errorf("expected 'must start with output.' error, got: %v", err)
}
}
func TestValidate_OnComplete_ParallelAndSequential(t *testing.T) {
formula := &Formula{
Formula: "mol-invalid",
Version: 1,
Type: TypeWorkflow,
Steps: []*Step{
{
ID: "survey",
Title: "Survey",
OnComplete: &OnCompleteSpec{
ForEach: "output.items",
Bond: "mol-item",
Parallel: true,
Sequential: true, // Can't have both
},
},
},
}
err := formula.Validate()
if err == nil {
t.Error("expected validation error for parallel + sequential")
}
if !strings.Contains(err.Error(), "cannot set both parallel and sequential") {
t.Errorf("expected 'cannot set both' error, got: %v", err)
}
}
func TestValidate_OnComplete_Sequential(t *testing.T) {
formula := &Formula{
Formula: "mol-valid",
Version: 1,
Type: TypeWorkflow,
Steps: []*Step{
{
ID: "process-queue",
Title: "Process queue",
OnComplete: &OnCompleteSpec{
ForEach: "output.branches",
Bond: "mol-merge",
Sequential: true,
},
},
},
}
if err := formula.Validate(); err != nil {
t.Errorf("Validate failed for sequential on_complete: %v", err)
}
}
func TestValidate_OnComplete_InChildren(t *testing.T) {
formula := &Formula{
Formula: "mol-valid",
Version: 1,
Type: TypeWorkflow,
Steps: []*Step{
{
ID: "parent",
Title: "Parent",
Children: []*Step{
{
ID: "child",
Title: "Child",
OnComplete: &OnCompleteSpec{
ForEach: "output.items",
Bond: "mol-item",
},
},
},
},
},
}
if err := formula.Validate(); err != nil {
t.Errorf("Validate failed for on_complete in child: %v", err)
}
}

View File

@@ -184,6 +184,10 @@ type Step struct {
// Loop defines iteration for this step (gt-8tmz.4).
// When set, the step becomes a container that expands its body.
Loop *LoopSpec `json:"loop,omitempty"`
// OnComplete defines actions triggered when this step completes (gt-8tmz.8).
// Used for runtime expansion over step output (the for-each construct).
OnComplete *OnCompleteSpec `json:"on_complete,omitempty"`
}
// Gate defines an async wait condition (integrates with bd-udsi).
@@ -218,6 +222,46 @@ type LoopSpec struct {
Body []*Step `json:"body"`
}
// OnCompleteSpec defines actions triggered when a step completes (gt-8tmz.8).
// Used for runtime expansion over step output (the for-each construct).
//
// Example YAML:
//
// step: survey-workers
// on_complete:
// for_each: output.polecats
// bond: mol-polecat-arm
// vars:
// polecat_name: "{item.name}"
// rig: "{item.rig}"
// parallel: true
type OnCompleteSpec struct {
// ForEach is the path to the iterable collection in step output.
// Format: "output.<field>" or "output.<field>.<nested>"
// The collection must be an array at runtime.
ForEach string `json:"for_each,omitempty"`
// Bond is the formula to instantiate for each item.
// A new molecule is created for each element in the ForEach collection.
Bond string `json:"bond,omitempty"`
// Vars are variable bindings for each iteration.
// Supports placeholders:
// - {item} - the current item value (for primitives)
// - {item.field} - a field from the current item (for objects)
// - {index} - the zero-based iteration index
Vars map[string]string `json:"vars,omitempty"`
// Parallel runs all bonded molecules concurrently (default behavior).
// Set to true to make this explicit.
Parallel bool `json:"parallel,omitempty"`
// Sequential runs bonded molecules one at a time.
// Each molecule starts only after the previous one completes.
// Mutually exclusive with Parallel.
Sequential bool `json:"sequential,omitempty"`
}
// BranchRule defines parallel execution paths that rejoin (gt-8tmz.4).
// Creates a fork-join pattern: from -> [parallel steps] -> join.
type BranchRule struct {
@@ -474,6 +518,10 @@ func (f *Formula) Validate() error {
errs = append(errs, fmt.Sprintf("steps[%d] (%s): waits_for has invalid value %q (must be all-children or any-children)", i, step.ID, step.WaitsFor))
}
}
// Validate on_complete field (gt-8tmz.8) - runtime expansion
if step.OnComplete != nil {
validateOnComplete(step.OnComplete, &errs, fmt.Sprintf("steps[%d] (%s)", i, step.ID))
}
// Validate children's depends_on and needs recursively
validateChildDependsOn(step.Children, stepIDLocations, &errs, fmt.Sprintf("steps[%d]", i))
}
@@ -566,10 +614,37 @@ func validateChildDependsOn(children []*Step, idLocations map[string]string, err
*errs = append(*errs, fmt.Sprintf("%s (%s): waits_for has invalid value %q (must be all-children or any-children)", childPrefix, child.ID, child.WaitsFor))
}
}
// Validate on_complete field (gt-8tmz.8)
if child.OnComplete != nil {
validateOnComplete(child.OnComplete, errs, fmt.Sprintf("%s (%s)", childPrefix, child.ID))
}
validateChildDependsOn(child.Children, idLocations, errs, childPrefix)
}
}
// validateOnComplete validates an OnCompleteSpec (gt-8tmz.8).
func validateOnComplete(oc *OnCompleteSpec, errs *[]string, prefix string) {
// Check that for_each and bond are both present or both absent
if oc.ForEach != "" && oc.Bond == "" {
*errs = append(*errs, fmt.Sprintf("%s.on_complete: bond is required when for_each is set", prefix))
}
if oc.ForEach == "" && oc.Bond != "" {
*errs = append(*errs, fmt.Sprintf("%s.on_complete: for_each is required when bond is set", prefix))
}
// Validate for_each path format
if oc.ForEach != "" {
if !strings.HasPrefix(oc.ForEach, "output.") {
*errs = append(*errs, fmt.Sprintf("%s.on_complete: for_each must start with 'output.' (got %q)", prefix, oc.ForEach))
}
}
// Check parallel and sequential are mutually exclusive
if oc.Parallel && oc.Sequential {
*errs = append(*errs, fmt.Sprintf("%s.on_complete: cannot set both parallel and sequential", prefix))
}
}
// GetRequiredVars returns the names of all required variables.
func (f *Formula) GetRequiredVars() []string {
var required []string