MCPcopy
hub / github.com/celery/celery / run

Method run

celery/canvas.py:1045–1089  ·  view source on GitHub ↗

Executes the chain. Responsible for executing the chain in the correct order. In a case of a chain of a single task, the task is executed directly and the result is returned for that task specifically.

(self, args=None, kwargs=None, group_id=None, chord=None,
            task_id=None, link=None, link_error=None, publisher=None,
            producer=None, root_id=None, parent_id=None, app=None,
            group_index=None, **options)

Source from the content-addressed store, hash-verified

1043 dict(self.options, **options) if options else self.options))
1044
1045 def run(self, args=None, kwargs=None, group_id=None, chord=None,
1046 task_id=None, link=None, link_error=None, publisher=None,
1047 producer=None, root_id=None, parent_id=None, app=None,
1048 group_index=None, **options):
1049 """Executes the chain.
1050
1051 Responsible for executing the chain in the correct order.
1052 In a case of a chain of a single task, the task is executed directly
1053 and the result is returned for that task specifically.
1054 """
1055 # pylint: disable=redefined-outer-name
1056 # XXX chord is also a class in outer scope.
1057 args = args if args else ()
1058 kwargs = kwargs if kwargs else []
1059 app = app or self.app
1060 use_link = self._use_link
1061 if use_link is None and app.conf.task_protocol == 1:
1062 use_link = True
1063 args = (tuple(args) + tuple(self.args)
1064 if args and not self.immutable else self.args)
1065
1066 # Unpack nested chains/groups/chords
1067 tasks, results_from_prepare = self.prepare_steps(
1068 args, kwargs, self.tasks, root_id, parent_id, link_error, app,
1069 task_id, group_id, chord, group_index=group_index,
1070 )
1071
1072 # For a chain of single task, execute the task directly and return the result for that task
1073 # For a chain of multiple tasks, execute all of the tasks and return the AsyncResult for the chain
1074 if results_from_prepare:
1075 if link:
1076 tasks[0].extend_list_option('link', link)
1077 first_task = tasks.pop()
1078 options = _prepare_chain_from_options(options, tasks, use_link)
1079
1080 result_from_apply = first_task.apply_async(**options)
1081 # If we only have a single task, it may be important that we pass
1082 # the real result object rather than the one obtained via freezing.
1083 # e.g. For `GroupResult`s, we need to pass back the result object
1084 # which will actually have its promise fulfilled by the subtasks,
1085 # something that will never occur for the frozen result.
1086 if not tasks:
1087 return result_from_apply
1088 else:
1089 return results_from_prepare[0]
1090
1091 # in order for a chain to be frozen, each of the members of the chain individually needs to be frozen
1092 # TODO figure out why we are always cloning before freeze

Callers 1

apply_asyncMethod · 0.95

Calls 5

prepare_stepsMethod · 0.95
extend_list_optionMethod · 0.80
popMethod · 0.45
apply_asyncMethod · 0.45

Tested by

no test coverage detected