(self, other)
| 955 | return self.apply_async(args, kwargs) |
| 956 | |
| 957 | def __or__(self, other): |
| 958 | if isinstance(other, group): |
| 959 | # unroll group with one member |
| 960 | other = maybe_unroll_group(other) |
| 961 | if not isinstance(other, group): |
| 962 | return self.__or__(other) |
| 963 | # chain | group() -> chain |
| 964 | tasks = self.unchain_tasks() |
| 965 | if not tasks: |
| 966 | # If the chain is empty, return the group |
| 967 | return other |
| 968 | if isinstance(tasks[-1], chord): |
| 969 | # CHAIN [last item is chord] | GROUP -> chain with chord body. |
| 970 | tasks[-1].body = tasks[-1].body | other |
| 971 | return type(self)(tasks, app=self.app) |
| 972 | # use type(self) for _chain subclasses |
| 973 | return type(self)(seq_concat_item( |
| 974 | tasks, other), app=self._app) |
| 975 | elif isinstance(other, _chain): |
| 976 | # chain | chain -> chain |
| 977 | return reduce(operator.or_, other.unchain_tasks(), self) |
| 978 | elif isinstance(other, Signature): |
| 979 | if self.tasks and isinstance(self.tasks[-1], group): |
| 980 | # CHAIN [last item is group] | TASK -> chord |
| 981 | sig = self.clone() |
| 982 | sig.tasks[-1] = chord( |
| 983 | sig.tasks[-1], other, app=self._app) |
| 984 | # In the scenario where the second-to-last item in a chain is a chord, |
| 985 | # it leads to a situation where two consecutive chords are formed. |
| 986 | # In such cases, a further upgrade can be considered. |
| 987 | # This would involve chaining the body of the second-to-last chord with the last chord." |
| 988 | if len(sig.tasks) > 1 and isinstance(sig.tasks[-2], chord): |
| 989 | sig.tasks[-2].body = sig.tasks[-2].body | sig.tasks[-1] |
| 990 | sig.tasks = sig.tasks[:-1] |
| 991 | return sig |
| 992 | elif self.tasks and isinstance(self.tasks[-1], chord) and not isinstance(other, chord): |
| 993 | # CHAIN [last item is chord] | TASK -> chain with chord body. |
| 994 | sig = self.clone() |
| 995 | sig.tasks[-1].body = sig.tasks[-1].body | other |
| 996 | return sig |
| 997 | else: |
| 998 | # chain | task/chord -> chain |
| 999 | # use type(self) for _chain subclasses |
| 1000 | return type(self)(seq_concat_item( |
| 1001 | self.unchain_tasks(), other), app=self._app) |
| 1002 | else: |
| 1003 | return NotImplemented |
| 1004 | |
| 1005 | def clone(self, *args, **kwargs): |
| 1006 | to_signature = maybe_signature |
nothing calls this directly
no test coverage detected