Executes the chain. Responsible for executing the chain in the correct order. In a case of a chain of a single task, the task is executed directly and the result is returned for that task specifically.
(self, args=None, kwargs=None, group_id=None, chord=None,
task_id=None, link=None, link_error=None, publisher=None,
producer=None, root_id=None, parent_id=None, app=None,
group_index=None, **options)
| 1043 | dict(self.options, **options) if options else self.options)) |
| 1044 | |
| 1045 | def run(self, args=None, kwargs=None, group_id=None, chord=None, |
| 1046 | task_id=None, link=None, link_error=None, publisher=None, |
| 1047 | producer=None, root_id=None, parent_id=None, app=None, |
| 1048 | group_index=None, **options): |
| 1049 | """Executes the chain. |
| 1050 | |
| 1051 | Responsible for executing the chain in the correct order. |
| 1052 | In a case of a chain of a single task, the task is executed directly |
| 1053 | and the result is returned for that task specifically. |
| 1054 | """ |
| 1055 | # pylint: disable=redefined-outer-name |
| 1056 | # XXX chord is also a class in outer scope. |
| 1057 | args = args if args else () |
| 1058 | kwargs = kwargs if kwargs else [] |
| 1059 | app = app or self.app |
| 1060 | use_link = self._use_link |
| 1061 | if use_link is None and app.conf.task_protocol == 1: |
| 1062 | use_link = True |
| 1063 | args = (tuple(args) + tuple(self.args) |
| 1064 | if args and not self.immutable else self.args) |
| 1065 | |
| 1066 | # Unpack nested chains/groups/chords |
| 1067 | tasks, results_from_prepare = self.prepare_steps( |
| 1068 | args, kwargs, self.tasks, root_id, parent_id, link_error, app, |
| 1069 | task_id, group_id, chord, group_index=group_index, |
| 1070 | ) |
| 1071 | |
| 1072 | # For a chain of single task, execute the task directly and return the result for that task |
| 1073 | # For a chain of multiple tasks, execute all of the tasks and return the AsyncResult for the chain |
| 1074 | if results_from_prepare: |
| 1075 | if link: |
| 1076 | tasks[0].extend_list_option('link', link) |
| 1077 | first_task = tasks.pop() |
| 1078 | options = _prepare_chain_from_options(options, tasks, use_link) |
| 1079 | |
| 1080 | result_from_apply = first_task.apply_async(**options) |
| 1081 | # If we only have a single task, it may be important that we pass |
| 1082 | # the real result object rather than the one obtained via freezing. |
| 1083 | # e.g. For `GroupResult`s, we need to pass back the result object |
| 1084 | # which will actually have its promise fulfilled by the subtasks, |
| 1085 | # something that will never occur for the frozen result. |
| 1086 | if not tasks: |
| 1087 | return result_from_apply |
| 1088 | else: |
| 1089 | return results_from_prepare[0] |
| 1090 | |
| 1091 | # in order for a chain to be frozen, each of the members of the chain individually needs to be frozen |
| 1092 | # TODO figure out why we are always cloning before freeze |
no test coverage detected