MCPcopy
hub / github.com/celery/celery / on_signature

Method on_signature

t/unit/tasks/test_stamping.py:13–22  ·  view source on GitHub ↗
(self, actual_sig: Signature, **headers)

Source from the content-addressed store, hash-verified

11
12class LinkingVisitor(StampingVisitor):
13 def on_signature(self, actual_sig: Signature, **headers) -> dict:
14 link_workflow = chain(
15 group(signature("task1"), signature("task2")),
16 signature("task3"),
17 )
18 link = signature(f"{actual_sig.name}_link") | link_workflow.clone()
19 actual_sig.link(link)
20 link_error = signature(f"{actual_sig.name}_link_error") | link_workflow.clone()
21 actual_sig.link_error(link_error)
22 return super().on_signature(actual_sig, **headers)
23
24
25class CleanupVisitor(StampingVisitor):

Callers 4

on_signatureMethod · 0.45
on_signatureMethod · 0.45
on_signatureMethod · 0.45

Calls 6

chainClass · 0.90
groupClass · 0.90
signatureFunction · 0.90
cloneMethod · 0.45
linkMethod · 0.45
link_errorMethod · 0.45

Tested by

no test coverage detected