Replace this task, with a new task inheriting the task id. Execution of the host task ends immediately and no subsequent statements will be run. .. versionadded:: 4.0 Arguments: sig (Signature): signature to replace with. visitor (StampingVi
(self, sig)
| 947 | uuid=req.id, retry=retry, retry_policy=retry_policy, **fields) |
| 948 | |
| 949 | def replace(self, sig): |
| 950 | """Replace this task, with a new task inheriting the task id. |
| 951 | |
| 952 | Execution of the host task ends immediately and no subsequent statements |
| 953 | will be run. |
| 954 | |
| 955 | .. versionadded:: 4.0 |
| 956 | |
| 957 | Arguments: |
| 958 | sig (Signature): signature to replace with. |
| 959 | visitor (StampingVisitor): Visitor API object. |
| 960 | |
| 961 | Raises: |
| 962 | ~@Ignore: This is always raised when called in asynchronous context. |
| 963 | It is best to always use ``return self.replace(...)`` to convey |
| 964 | to the reader that the task won't continue after being replaced. |
| 965 | """ |
| 966 | chord = self.request.chord |
| 967 | if 'chord' in sig.options: |
| 968 | raise ImproperlyConfigured( |
| 969 | "A signature replacing a task must not be part of a chord" |
| 970 | ) |
| 971 | if isinstance(sig, _chain) and not getattr(sig, "tasks", True): |
| 972 | raise ImproperlyConfigured("Cannot replace with an empty chain") |
| 973 | |
| 974 | # Ensure callbacks or errbacks from the replaced signature are retained |
| 975 | if isinstance(sig, group): |
| 976 | # Groups get uplifted to a chord so that we can link onto the body |
| 977 | sig |= self.app.tasks['celery.accumulate'].s(index=0) |
| 978 | for callback in maybe_list(self.request.callbacks) or []: |
| 979 | sig.link(callback) |
| 980 | for errback in maybe_list(self.request.errbacks) or []: |
| 981 | sig.link_error(errback) |
| 982 | # If the replacement signature is a chain, we need to push callbacks |
| 983 | # down to the final task so they run at the right time even if we |
| 984 | # proceed to link further tasks from the original request below |
| 985 | if isinstance(sig, _chain) and "link" in sig.options: |
| 986 | final_task_links = sig.tasks[-1].options.setdefault("link", []) |
| 987 | final_task_links.extend(maybe_list(sig.options["link"])) |
| 988 | # We need to freeze the replacement signature with the current task's |
| 989 | # ID to ensure that we don't disassociate it from the existing task IDs |
| 990 | # which would break previously constructed results objects. |
| 991 | sig.freeze(self.request.id) |
| 992 | # Ensure the important options from the original signature are retained |
| 993 | replaced_task_nesting = self.request.get('replaced_task_nesting', 0) + 1 |
| 994 | sig.set( |
| 995 | chord=chord, |
| 996 | group_id=self.request.group, |
| 997 | group_index=self.request.group_index, |
| 998 | root_id=self.request.root_id, |
| 999 | replaced_task_nesting=replaced_task_nesting |
| 1000 | ) |
| 1001 | |
| 1002 | # If the replaced task is a chain, we want to set all of the chain tasks |
| 1003 | # with the same replaced_task_nesting value to mark their replacement nesting level |
| 1004 | if isinstance(sig, _chain): |
| 1005 | for chain_task in maybe_list(sig.tasks) or []: |
| 1006 | chain_task.set(replaced_task_nesting=replaced_task_nesting) |