MCPcopy
hub / github.com/celery/celery / test_stamping_workflow

Method test_stamping_workflow

t/integration/test_canvas.py:3373–3442  ·  view source on GitHub ↗
(self, manager, subtests)

Source from the content-addressed store, hash-verified

3371
3372class test_stamping_mechanism:
3373 def test_stamping_workflow(self, manager, subtests):
3374 try:
3375 manager.app.backend.ensure_chords_allowed()
3376 except NotImplementedError as e:
3377 raise pytest.skip(e.args[0])
3378
3379 workflow = group(
3380 add.s(1, 2) | add.s(3),
3381 add.s(4, 5) | add.s(6),
3382 identity.si(21),
3383 ) | group(
3384 xsum.s(),
3385 xsum.s(),
3386 )
3387
3388 @task_received.connect
3389 def task_received_handler(request=None, **kwargs):
3390 nonlocal assertion_result
3391 link = None
3392 if request._Request__payload[2]["callbacks"]:
3393 link = signature(request._Request__payload[2]["callbacks"][0])
3394 link_error = None
3395 if request._Request__payload[2]["errbacks"]:
3396 link_error = signature(request._Request__payload[2]["errbacks"][0])
3397
3398 assertion_result = all(
3399 [
3400 assertion_result,
3401 [stamped_header in request.stamps for stamped_header in request.stamped_headers],
3402 [
3403 stamped_header in link.options
3404 for stamped_header in link.options["stamped_headers"]
3405 if link # the link itself doesn't have a link
3406 ],
3407 [
3408 stamped_header in link_error.options
3409 for stamped_header in link_error.options["stamped_headers"]
3410 if link_error # the link_error itself doesn't have a link_error
3411 ],
3412 ]
3413 )
3414
3415 @before_task_publish.connect
3416 def before_task_publish_handler(
3417 body=None,
3418 headers=None,
3419 **kwargs,
3420 ):
3421 nonlocal assertion_result
3422
3423 assertion_result = all(
3424 [stamped_header in headers["stamps"] for stamped_header in headers["stamped_headers"]]
3425 )
3426
3427 class CustomStampingVisitor(StampingVisitor):
3428 def on_signature(self, sig, **headers) -> dict:
3429 return {"on_signature": 42}
3430

Callers

nothing calls this directly

Calls 11

groupClass · 0.90
siMethod · 0.80
testMethod · 0.80
ensure_chords_allowedMethod · 0.45
sMethod · 0.45
linkMethod · 0.45
link_errorMethod · 0.45
stampMethod · 0.45
getMethod · 0.45
apply_asyncMethod · 0.45

Tested by

no test coverage detected