MCPcopy
hub / github.com/celery/celery / replace

Method replace

celery/app/task.py:949–1015  ·  view source on GitHub ↗

Replace this task, with a new task inheriting the task id. Execution of the host task ends immediately and no subsequent statements will be run. .. versionadded:: 4.0 Arguments: sig (Signature): signature to replace with. visitor (StampingVi

(self, sig)

Source from the content-addressed store, hash-verified

947 uuid=req.id, retry=retry, retry_policy=retry_policy, **fields)
948
949 def replace(self, sig):
950 """Replace this task, with a new task inheriting the task id.
951
952 Execution of the host task ends immediately and no subsequent statements
953 will be run.
954
955 .. versionadded:: 4.0
956
957 Arguments:
958 sig (Signature): signature to replace with.
959 visitor (StampingVisitor): Visitor API object.
960
961 Raises:
962 ~@Ignore: This is always raised when called in asynchronous context.
963 It is best to always use ``return self.replace(...)`` to convey
964 to the reader that the task won't continue after being replaced.
965 """
966 chord = self.request.chord
967 if 'chord' in sig.options:
968 raise ImproperlyConfigured(
969 "A signature replacing a task must not be part of a chord"
970 )
971 if isinstance(sig, _chain) and not getattr(sig, "tasks", True):
972 raise ImproperlyConfigured("Cannot replace with an empty chain")
973
974 # Ensure callbacks or errbacks from the replaced signature are retained
975 if isinstance(sig, group):
976 # Groups get uplifted to a chord so that we can link onto the body
977 sig |= self.app.tasks['celery.accumulate'].s(index=0)
978 for callback in maybe_list(self.request.callbacks) or []:
979 sig.link(callback)
980 for errback in maybe_list(self.request.errbacks) or []:
981 sig.link_error(errback)
982 # If the replacement signature is a chain, we need to push callbacks
983 # down to the final task so they run at the right time even if we
984 # proceed to link further tasks from the original request below
985 if isinstance(sig, _chain) and "link" in sig.options:
986 final_task_links = sig.tasks[-1].options.setdefault("link", [])
987 final_task_links.extend(maybe_list(sig.options["link"]))
988 # We need to freeze the replacement signature with the current task's
989 # ID to ensure that we don't disassociate it from the existing task IDs
990 # which would break previously constructed results objects.
991 sig.freeze(self.request.id)
992 # Ensure the important options from the original signature are retained
993 replaced_task_nesting = self.request.get('replaced_task_nesting', 0) + 1
994 sig.set(
995 chord=chord,
996 group_id=self.request.group,
997 group_index=self.request.group_index,
998 root_id=self.request.root_id,
999 replaced_task_nesting=replaced_task_nesting
1000 )
1001
1002 # If the replaced task is a chain, we want to set all of the chain tasks
1003 # with the same replaced_task_nesting value to mark their replacement nesting level
1004 if isinstance(sig, _chain):
1005 for chain_task in maybe_list(sig.tasks) or []:
1006 chain_task.set(replaced_task_nesting=replaced_task_nesting)

Callers 15

authorFunction · 0.45
add_replacedFunction · 0.45
replace_with_chainFunction · 0.45
replace_with_empty_chainFunction · 0.45
add_to_allFunction · 0.45
second_order_replace1Function · 0.45
second_order_replace2Function · 0.45
fail_replacedFunction · 0.45
old_reprFunction · 0.45

Calls 12

on_replaceMethod · 0.95
signatureFunction · 0.90
maybe_listFunction · 0.85
sMethod · 0.45
linkMethod · 0.45
link_errorMethod · 0.45
setdefaultMethod · 0.45
extendMethod · 0.45
freezeMethod · 0.45
getMethod · 0.45
setMethod · 0.45

Tested by 15

old_reprFunction · 0.36
test_textMethod · 0.36
test_maybe_make_awareMethod · 0.36
replacedMethod · 0.36
replaced_groupMethod · 0.36
replace_with_groupMethod · 0.36
replace_with_chainMethod · 0.36
replacedMethod · 0.36