MCPcopy
hub / github.com/celery/celery / test_apply_async_adds_children

Method test_apply_async_adds_children

t/unit/app/test_app.py:884–904  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

882 i.annotate()
883
884 def test_apply_async_adds_children(self):
885 from celery._state import _task_stack
886
887 @self.app.task(bind=True, shared=False)
888 def a3cX1(self):
889 pass
890
891 @self.app.task(bind=True, shared=False)
892 def a3cX2(self):
893 pass
894
895 _task_stack.push(a3cX1)
896 try:
897 a3cX1.push_request(called_directly=False)
898 try:
899 res = a3cX2.apply_async(add_to_parent=True)
900 assert res in a3cX1.request.children
901 finally:
902 a3cX1.pop_request()
903 finally:
904 _task_stack.pop()
905
906 def test_pickle_app(self):
907 changes = {'THE_FOO_BAR': 'bars',

Callers

nothing calls this directly

Calls 5

pushMethod · 0.80
push_requestMethod · 0.80
pop_requestMethod · 0.80
apply_asyncMethod · 0.45
popMethod · 0.45

Tested by

no test coverage detected