MCPcopy
hub / github.com/celery/celery / __or__

Method __or__

celery/canvas.py:957–1003  ·  view source on GitHub ↗
(self, other)

Source from the content-addressed store, hash-verified

955 return self.apply_async(args, kwargs)
956
957 def __or__(self, other):
958 if isinstance(other, group):
959 # unroll group with one member
960 other = maybe_unroll_group(other)
961 if not isinstance(other, group):
962 return self.__or__(other)
963 # chain | group() -> chain
964 tasks = self.unchain_tasks()
965 if not tasks:
966 # If the chain is empty, return the group
967 return other
968 if isinstance(tasks[-1], chord):
969 # CHAIN [last item is chord] | GROUP -> chain with chord body.
970 tasks[-1].body = tasks[-1].body | other
971 return type(self)(tasks, app=self.app)
972 # use type(self) for _chain subclasses
973 return type(self)(seq_concat_item(
974 tasks, other), app=self._app)
975 elif isinstance(other, _chain):
976 # chain | chain -> chain
977 return reduce(operator.or_, other.unchain_tasks(), self)
978 elif isinstance(other, Signature):
979 if self.tasks and isinstance(self.tasks[-1], group):
980 # CHAIN [last item is group] | TASK -> chord
981 sig = self.clone()
982 sig.tasks[-1] = chord(
983 sig.tasks[-1], other, app=self._app)
984 # In the scenario where the second-to-last item in a chain is a chord,
985 # it leads to a situation where two consecutive chords are formed.
986 # In such cases, a further upgrade can be considered.
987 # This would involve chaining the body of the second-to-last chord with the last chord."
988 if len(sig.tasks) > 1 and isinstance(sig.tasks[-2], chord):
989 sig.tasks[-2].body = sig.tasks[-2].body | sig.tasks[-1]
990 sig.tasks = sig.tasks[:-1]
991 return sig
992 elif self.tasks and isinstance(self.tasks[-1], chord) and not isinstance(other, chord):
993 # CHAIN [last item is chord] | TASK -> chain with chord body.
994 sig = self.clone()
995 sig.tasks[-1].body = sig.tasks[-1].body | other
996 return sig
997 else:
998 # chain | task/chord -> chain
999 # use type(self) for _chain subclasses
1000 return type(self)(seq_concat_item(
1001 self.unchain_tasks(), other), app=self._app)
1002 else:
1003 return NotImplemented
1004
1005 def clone(self, *args, **kwargs):
1006 to_signature = maybe_signature

Callers

nothing calls this directly

Calls 5

unchain_tasksMethod · 0.95
cloneMethod · 0.95
seq_concat_itemFunction · 0.90
maybe_unroll_groupFunction · 0.85
chordFunction · 0.85

Tested by

no test coverage detected