Execute the chord. Executing the chord means executing the header and sending the result to the body. In case of an empty header, the body is executed immediately. Arguments: header (group): The header to execute. body (Signature): The body t
(self, header, body, partial_args, app=None, interval=None,
countdown=1, max_retries=None, eager=False,
task_id=None, kwargs=None, **options)
| 2203 | return sum(self._descend(task) for task in tasks) |
| 2204 | |
| 2205 | def run(self, header, body, partial_args, app=None, interval=None, |
| 2206 | countdown=1, max_retries=None, eager=False, |
| 2207 | task_id=None, kwargs=None, **options): |
| 2208 | """Execute the chord. |
| 2209 | |
| 2210 | Executing the chord means executing the header and sending the |
| 2211 | result to the body. In case of an empty header, the body is |
| 2212 | executed immediately. |
| 2213 | |
| 2214 | Arguments: |
| 2215 | header (group): The header to execute. |
| 2216 | body (Signature): The body to execute. |
| 2217 | partial_args (tuple): Arguments to pass to the header. |
| 2218 | app (Celery): The Celery app instance. |
| 2219 | interval (float): The interval between retries. |
| 2220 | countdown (int): The countdown between retries. |
| 2221 | max_retries (int): The maximum number of retries. |
| 2222 | task_id (str): The task id to use for the body. |
| 2223 | kwargs (dict): Keyword arguments to pass to the header. |
| 2224 | options (dict): Options to pass to the header. |
| 2225 | |
| 2226 | Returns: |
| 2227 | AsyncResult: The result of the body (with the result of the header in the parent of the body). |
| 2228 | """ |
| 2229 | app = app or self._get_app(body) |
| 2230 | group_id = header.options.get('task_id') or uuid() |
| 2231 | root_id = body.options.get('root_id') |
| 2232 | options = dict(self.options, **options) if options else self.options |
| 2233 | if options: |
| 2234 | options.pop('task_id', None) |
| 2235 | body.options.update(options) |
| 2236 | |
| 2237 | body_task_id = task_id or uuid() |
| 2238 | bodyres = body.freeze(body_task_id, group_id=group_id, root_id=root_id) |
| 2239 | |
| 2240 | # Chains should not be passed to the header tasks. See #3771 |
| 2241 | options.pop('chain', None) |
| 2242 | # Neither should chords, for deeply nested chords to work |
| 2243 | options.pop('chord', None) |
| 2244 | options.pop('task_id', None) |
| 2245 | |
| 2246 | header_result_args = header._freeze_group_tasks(group_id=group_id, chord=body, root_id=root_id) |
| 2247 | |
| 2248 | if header.tasks: |
| 2249 | app.backend.apply_chord( |
| 2250 | header_result_args, |
| 2251 | body, |
| 2252 | interval=interval, |
| 2253 | countdown=countdown, |
| 2254 | max_retries=max_retries, |
| 2255 | ) |
| 2256 | header_result = header.apply_async(partial_args, kwargs, task_id=group_id, **options) |
| 2257 | # The execution of a chord body is normally triggered by its header's |
| 2258 | # tasks completing. If the header is empty this will never happen, so |
| 2259 | # we execute the body manually here. |
| 2260 | else: |
| 2261 | body.delay([]) |
| 2262 | header_result = self.app.GroupResult(*header_result_args) |
no test coverage detected