MCPcopy
hub / github.com/celery/celery / _chain

Class _chain

celery/canvas.py:932–1305  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

930
931@Signature.register_type(name='chain')
932class _chain(Signature):
933 tasks = getitem_property('kwargs.tasks', 'Tasks in chain.')
934
935 @classmethod
936 def from_dict(cls, d, app=None):
937 tasks = d['kwargs']['tasks']
938 if tasks:
939 if isinstance(tasks, tuple): # aaaargh
940 tasks = d['kwargs']['tasks'] = list(tasks)
941 tasks = [maybe_signature(task, app=app) for task in tasks]
942 return cls(tasks, app=app, **d['options'])
943
944 def __init__(self, *tasks, **options):
945 tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0])
946 else tasks)
947 super().__init__('celery.chain', (), {'tasks': tasks}, **options
948 )
949 self._use_link = options.pop('use_link', None)
950 self.subtask_type = 'chain'
951 self._frozen = None
952
953 def __call__(self, *args, **kwargs):
954 if self.tasks:
955 return self.apply_async(args, kwargs)
956
957 def __or__(self, other):
958 if isinstance(other, group):
959 # unroll group with one member
960 other = maybe_unroll_group(other)
961 if not isinstance(other, group):
962 return self.__or__(other)
963 # chain | group() -> chain
964 tasks = self.unchain_tasks()
965 if not tasks:
966 # If the chain is empty, return the group
967 return other
968 if isinstance(tasks[-1], chord):
969 # CHAIN [last item is chord] | GROUP -> chain with chord body.
970 tasks[-1].body = tasks[-1].body | other
971 return type(self)(tasks, app=self.app)
972 # use type(self) for _chain subclasses
973 return type(self)(seq_concat_item(
974 tasks, other), app=self._app)
975 elif isinstance(other, _chain):
976 # chain | chain -> chain
977 return reduce(operator.or_, other.unchain_tasks(), self)
978 elif isinstance(other, Signature):
979 if self.tasks and isinstance(self.tasks[-1], group):
980 # CHAIN [last item is group] | TASK -> chord
981 sig = self.clone()
982 sig.tasks[-1] = chord(
983 sig.tasks[-1], other, app=self._app)
984 # In the scenario where the second-to-last item in a chain is a chord,
985 # it leads to a situation where two consecutive chords are formed.
986 # In such cases, a further upgrade can be considered.
987 # This would involve chaining the body of the second-to-last chord with the last chord."
988 if len(sig.tasks) > 1 and isinstance(sig.tasks[-2], chord):
989 sig.tasks[-2].body = sig.tasks[-2].body | sig.tasks[-1]

Callers 2

__or__Method · 0.85
__new__Method · 0.85

Calls 1

getitem_propertyClass · 0.90

Tested by

no test coverage detected