run walks the plan DAG, executing nodes in parallel where possible while respecting dependency edges. Emits progress events and handles group-based event aggregation for composite operations like recreate.
(ctx context.Context, plan *Plan)
| 88 | // respecting dependency edges. Emits progress events and handles group-based |
| 89 | // event aggregation for composite operations like recreate. |
| 90 | func (exec *planExecutor) run(ctx context.Context, plan *Plan) error { |
| 91 | if plan.IsEmpty() { |
| 92 | return nil |
| 93 | } |
| 94 | |
| 95 | // Build a done-channel per node so dependents can wait |
| 96 | done := make(map[int]chan struct{}, len(plan.Nodes)) |
| 97 | for _, node := range plan.Nodes { |
| 98 | done[node.ID] = make(chan struct{}) |
| 99 | } |
| 100 | |
| 101 | // Track group event state: first node emits Working, last emits Done |
| 102 | groups := exec.buildGroupTracker(plan) |
| 103 | events := exec.compose.events |
| 104 | |
| 105 | eg, ctx := errgroup.WithContext(ctx) |
| 106 | for _, node := range plan.Nodes { |
| 107 | eg.Go(func() error { |
| 108 | // Wait for all dependencies |
| 109 | for _, dep := range node.DependsOn { |
| 110 | select { |
| 111 | case <-done[dep.ID]: |
| 112 | case <-ctx.Done(): |
| 113 | return ctx.Err() |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | // Emit group start event if this is the first node of a group |
| 118 | groups.onNodeStart(node, events) |
| 119 | |
| 120 | err := exec.executeNode(ctx, node) |
| 121 | |
| 122 | if err == nil { |
| 123 | // Emit group done event if this is the last node of a group |
| 124 | groups.onNodeDone(node, events) |
| 125 | } else if ctx.Err() == nil { |
| 126 | groups.onNodeError(node, events, err) |
| 127 | } |
| 128 | |
| 129 | close(done[node.ID]) |
| 130 | return err |
| 131 | }) |
| 132 | } |
| 133 | |
| 134 | return eg.Wait() |
| 135 | } |
| 136 | |
| 137 | // executeNode dispatches a single plan node to the appropriate API call. |
| 138 | func (exec *planExecutor) executeNode(ctx context.Context, node *PlanNode) error { |