(self, actual_sig: Signature, **headers)
| 11 | |
| 12 | class LinkingVisitor(StampingVisitor): |
| 13 | def on_signature(self, actual_sig: Signature, **headers) -> dict: |
| 14 | link_workflow = chain( |
| 15 | group(signature("task1"), signature("task2")), |
| 16 | signature("task3"), |
| 17 | ) |
| 18 | link = signature(f"{actual_sig.name}_link") | link_workflow.clone() |
| 19 | actual_sig.link(link) |
| 20 | link_error = signature(f"{actual_sig.name}_link_error") | link_workflow.clone() |
| 21 | actual_sig.link_error(link_error) |
| 22 | return super().on_signature(actual_sig, **headers) |
| 23 | |
| 24 | |
| 25 | class CleanupVisitor(StampingVisitor): |
no test coverage detected