Prepare the chain for execution. To execute a chain, we first need to unpack it correctly. During the unpacking, we might encounter other chains, groups, or chords which we need to unpack as well. For example: chain(signature1, chain(signature2, signature3))
(self, args, kwargs, tasks,
root_id=None, parent_id=None, link_error=None, app=None,
last_task_id=None, group_id=None, chord_body=None,
clone=True, from_dict=Signature.from_dict,
group_index=None)
| 1115 | visitor.on_chain_end(self, **headers) |
| 1116 | |
| 1117 | def prepare_steps(self, args, kwargs, tasks, |
| 1118 | root_id=None, parent_id=None, link_error=None, app=None, |
| 1119 | last_task_id=None, group_id=None, chord_body=None, |
| 1120 | clone=True, from_dict=Signature.from_dict, |
| 1121 | group_index=None): |
| 1122 | """Prepare the chain for execution. |
| 1123 | |
| 1124 | To execute a chain, we first need to unpack it correctly. |
| 1125 | During the unpacking, we might encounter other chains, groups, or chords |
| 1126 | which we need to unpack as well. |
| 1127 | |
| 1128 | For example: |
| 1129 | chain(signature1, chain(signature2, signature3)) --> Upgrades to chain(signature1, signature2, signature3) |
| 1130 | chain(group(signature1, signature2), signature3) --> Upgrades to chord([signature1, signature2], signature3) |
| 1131 | |
| 1132 | The responsibility of this method is to ensure that the chain is |
| 1133 | correctly unpacked, and then the correct callbacks are set up along the way. |
| 1134 | |
| 1135 | Arguments: |
| 1136 | args (Tuple): Partial args to be prepended to the existing args. |
| 1137 | kwargs (Dict): Partial kwargs to be merged with existing kwargs. |
| 1138 | tasks (List[Signature]): The tasks of the chain. |
| 1139 | root_id (str): The id of the root task. |
| 1140 | parent_id (str): The id of the parent task. |
| 1141 | link_error (Union[List[Signature], Signature]): The error callback. |
| 1142 | will be set for all tasks in the chain. |
| 1143 | app (Celery): The Celery app instance. |
| 1144 | last_task_id (str): The id of the last task in the chain. |
| 1145 | group_id (str): The id of the group that the chain is a part of. |
| 1146 | chord_body (Signature): The body of the chord, used to synchronize with the chain's |
| 1147 | last task and the chord's body when used together. |
| 1148 | clone (bool): Whether to clone the chain's tasks before modifying them. |
| 1149 | from_dict (Callable): A function that takes a dict and returns a Signature. |
| 1150 | |
| 1151 | Returns: |
| 1152 | Tuple[List[Signature], List[AsyncResult]]: The frozen tasks of the chain, and the async results |
| 1153 | """ |
| 1154 | app = app or self.app |
| 1155 | # use chain message field for protocol 2 and later. |
| 1156 | # this avoids pickle blowing the stack on the recursion |
| 1157 | # required by linking task together in a tree structure. |
| 1158 | # (why is pickle using recursion? or better yet why cannot python |
| 1159 | # do tail call optimization making recursion actually useful?) |
| 1160 | use_link = self._use_link |
| 1161 | if use_link is None and app.conf.task_protocol == 1: |
| 1162 | use_link = True |
| 1163 | steps = deque(tasks) |
| 1164 | |
| 1165 | # optimization: now the pop func is a local variable |
| 1166 | steps_pop = steps.pop |
| 1167 | steps_extend = steps.extend |
| 1168 | |
| 1169 | prev_task = None |
| 1170 | prev_res = None |
| 1171 | tasks, results = [], [] |
| 1172 | i = 0 |
| 1173 | # NOTE: We are doing this in reverse order. |
| 1174 | # The result is a list of tasks in reverse order, that is |