MCPcopy
hub / github.com/celery/celery / prepare_steps

Method prepare_steps

celery/canvas.py:1117–1278  ·  view source on GitHub ↗

Prepare the chain for execution. To execute a chain, we first need to unpack it correctly. During the unpacking, we might encounter other chains, groups, or chords which we need to unpack as well. For example: chain(signature1, chain(signature2, signature3))

(self, args, kwargs, tasks,
                      root_id=None, parent_id=None, link_error=None, app=None,
                      last_task_id=None, group_id=None, chord_body=None,
                      clone=True, from_dict=Signature.from_dict,
                      group_index=None)

Source from the content-addressed store, hash-verified

1115 visitor.on_chain_end(self, **headers)
1116
1117 def prepare_steps(self, args, kwargs, tasks,
1118 root_id=None, parent_id=None, link_error=None, app=None,
1119 last_task_id=None, group_id=None, chord_body=None,
1120 clone=True, from_dict=Signature.from_dict,
1121 group_index=None):
1122 """Prepare the chain for execution.
1123
1124 To execute a chain, we first need to unpack it correctly.
1125 During the unpacking, we might encounter other chains, groups, or chords
1126 which we need to unpack as well.
1127
1128 For example:
1129 chain(signature1, chain(signature2, signature3)) --> Upgrades to chain(signature1, signature2, signature3)
1130 chain(group(signature1, signature2), signature3) --> Upgrades to chord([signature1, signature2], signature3)
1131
1132 The responsibility of this method is to ensure that the chain is
1133 correctly unpacked, and then the correct callbacks are set up along the way.
1134
1135 Arguments:
1136 args (Tuple): Partial args to be prepended to the existing args.
1137 kwargs (Dict): Partial kwargs to be merged with existing kwargs.
1138 tasks (List[Signature]): The tasks of the chain.
1139 root_id (str): The id of the root task.
1140 parent_id (str): The id of the parent task.
1141 link_error (Union[List[Signature], Signature]): The error callback.
1142 will be set for all tasks in the chain.
1143 app (Celery): The Celery app instance.
1144 last_task_id (str): The id of the last task in the chain.
1145 group_id (str): The id of the group that the chain is a part of.
1146 chord_body (Signature): The body of the chord, used to synchronize with the chain's
1147 last task and the chord's body when used together.
1148 clone (bool): Whether to clone the chain's tasks before modifying them.
1149 from_dict (Callable): A function that takes a dict and returns a Signature.
1150
1151 Returns:
1152 Tuple[List[Signature], List[AsyncResult]]: The frozen tasks of the chain, and the async results
1153 """
1154 app = app or self.app
1155 # use chain message field for protocol 2 and later.
1156 # this avoids pickle blowing the stack on the recursion
1157 # required by linking task together in a tree structure.
1158 # (why is pickle using recursion? or better yet why cannot python
1159 # do tail call optimization making recursion actually useful?)
1160 use_link = self._use_link
1161 if use_link is None and app.conf.task_protocol == 1:
1162 use_link = True
1163 steps = deque(tasks)
1164
1165 # optimization: now the pop func is a local variable
1166 steps_pop = steps.pop
1167 steps_extend = steps.extend
1168
1169 prev_task = None
1170 prev_res = None
1171 tasks, results = [], []
1172 i = 0
1173 # NOTE: We are doing this in reverse order.
1174 # The result is a list of tasks in reverse order, that is

Callers 5

runMethod · 0.95
freezeMethod · 0.95
test_group_to_chordMethod · 0.80

Calls 9

maybe_unroll_groupFunction · 0.85
chordFunction · 0.85
maybe_listFunction · 0.85
cloneMethod · 0.45
popMethod · 0.45
freezeMethod · 0.45
linkMethod · 0.45
link_errorMethod · 0.45
ensure_chords_allowedMethod · 0.45