(self, manager, subtests)
| 3371 | |
| 3372 | class test_stamping_mechanism: |
| 3373 | def test_stamping_workflow(self, manager, subtests): |
| 3374 | try: |
| 3375 | manager.app.backend.ensure_chords_allowed() |
| 3376 | except NotImplementedError as e: |
| 3377 | raise pytest.skip(e.args[0]) |
| 3378 | |
| 3379 | workflow = group( |
| 3380 | add.s(1, 2) | add.s(3), |
| 3381 | add.s(4, 5) | add.s(6), |
| 3382 | identity.si(21), |
| 3383 | ) | group( |
| 3384 | xsum.s(), |
| 3385 | xsum.s(), |
| 3386 | ) |
| 3387 | |
| 3388 | @task_received.connect |
| 3389 | def task_received_handler(request=None, **kwargs): |
| 3390 | nonlocal assertion_result |
| 3391 | link = None |
| 3392 | if request._Request__payload[2]["callbacks"]: |
| 3393 | link = signature(request._Request__payload[2]["callbacks"][0]) |
| 3394 | link_error = None |
| 3395 | if request._Request__payload[2]["errbacks"]: |
| 3396 | link_error = signature(request._Request__payload[2]["errbacks"][0]) |
| 3397 | |
| 3398 | assertion_result = all( |
| 3399 | [ |
| 3400 | assertion_result, |
| 3401 | [stamped_header in request.stamps for stamped_header in request.stamped_headers], |
| 3402 | [ |
| 3403 | stamped_header in link.options |
| 3404 | for stamped_header in link.options["stamped_headers"] |
| 3405 | if link # the link itself doesn't have a link |
| 3406 | ], |
| 3407 | [ |
| 3408 | stamped_header in link_error.options |
| 3409 | for stamped_header in link_error.options["stamped_headers"] |
| 3410 | if link_error # the link_error itself doesn't have a link_error |
| 3411 | ], |
| 3412 | ] |
| 3413 | ) |
| 3414 | |
| 3415 | @before_task_publish.connect |
| 3416 | def before_task_publish_handler( |
| 3417 | body=None, |
| 3418 | headers=None, |
| 3419 | **kwargs, |
| 3420 | ): |
| 3421 | nonlocal assertion_result |
| 3422 | |
| 3423 | assertion_result = all( |
| 3424 | [stamped_header in headers["stamps"] for stamped_header in headers["stamped_headers"]] |
| 3425 | ) |
| 3426 | |
| 3427 | class CustomStampingVisitor(StampingVisitor): |
| 3428 | def on_signature(self, sig, **headers) -> dict: |
| 3429 | return {"on_signature": 42} |
| 3430 |
nothing calls this directly
no test coverage detected