| 930 | |
| 931 | @Signature.register_type(name='chain') |
| 932 | class _chain(Signature): |
| 933 | tasks = getitem_property('kwargs.tasks', 'Tasks in chain.') |
| 934 | |
| 935 | @classmethod |
| 936 | def from_dict(cls, d, app=None): |
| 937 | tasks = d['kwargs']['tasks'] |
| 938 | if tasks: |
| 939 | if isinstance(tasks, tuple): # aaaargh |
| 940 | tasks = d['kwargs']['tasks'] = list(tasks) |
| 941 | tasks = [maybe_signature(task, app=app) for task in tasks] |
| 942 | return cls(tasks, app=app, **d['options']) |
| 943 | |
| 944 | def __init__(self, *tasks, **options): |
| 945 | tasks = (regen(tasks[0]) if len(tasks) == 1 and is_list(tasks[0]) |
| 946 | else tasks) |
| 947 | super().__init__('celery.chain', (), {'tasks': tasks}, **options |
| 948 | ) |
| 949 | self._use_link = options.pop('use_link', None) |
| 950 | self.subtask_type = 'chain' |
| 951 | self._frozen = None |
| 952 | |
| 953 | def __call__(self, *args, **kwargs): |
| 954 | if self.tasks: |
| 955 | return self.apply_async(args, kwargs) |
| 956 | |
| 957 | def __or__(self, other): |
| 958 | if isinstance(other, group): |
| 959 | # unroll group with one member |
| 960 | other = maybe_unroll_group(other) |
| 961 | if not isinstance(other, group): |
| 962 | return self.__or__(other) |
| 963 | # chain | group() -> chain |
| 964 | tasks = self.unchain_tasks() |
| 965 | if not tasks: |
| 966 | # If the chain is empty, return the group |
| 967 | return other |
| 968 | if isinstance(tasks[-1], chord): |
| 969 | # CHAIN [last item is chord] | GROUP -> chain with chord body. |
| 970 | tasks[-1].body = tasks[-1].body | other |
| 971 | return type(self)(tasks, app=self.app) |
| 972 | # use type(self) for _chain subclasses |
| 973 | return type(self)(seq_concat_item( |
| 974 | tasks, other), app=self._app) |
| 975 | elif isinstance(other, _chain): |
| 976 | # chain | chain -> chain |
| 977 | return reduce(operator.or_, other.unchain_tasks(), self) |
| 978 | elif isinstance(other, Signature): |
| 979 | if self.tasks and isinstance(self.tasks[-1], group): |
| 980 | # CHAIN [last item is group] | TASK -> chord |
| 981 | sig = self.clone() |
| 982 | sig.tasks[-1] = chord( |
| 983 | sig.tasks[-1], other, app=self._app) |
| 984 | # In the scenario where the second-to-last item in a chain is a chord, |
| 985 | # it leads to a situation where two consecutive chords are formed. |
| 986 | # In such cases, a further upgrade can be considered. |
| 987 | # This would involve chaining the body of the second-to-last chord with the last chord." |
| 988 | if len(sig.tasks) > 1 and isinstance(sig.tasks[-2], chord): |
| 989 | sig.tasks[-2].body = sig.tasks[-2].body | sig.tasks[-1] |
no test coverage detected