MCPcopy
hub / github.com/celery/celery / test_apply_async_with_parent

Method test_apply_async_with_parent

t/unit/tasks/test_canvas.py:960–974  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

958 assert not res.results
959
960 def test_apply_async_with_parent(self):
961 _task_stack.push(self.add)
962 try:
963 self.add.push_request(called_directly=False)
964 try:
965 assert not self.add.request.children
966 x = group([self.add.s(4, 4), self.add.s(8, 8)])
967 res = x()
968 assert self.add.request.children
969 assert res in self.add.request.children
970 assert len(self.add.request.children) == 1
971 finally:
972 self.add.pop_request()
973 finally:
974 _task_stack.pop()
975
976 @pytest.mark.parametrize("group_type", (group, group_subclass))
977 def test_from_dict(self, group_type):

Callers

nothing calls this directly

Calls 7

groupClass · 0.90
xFunction · 0.85
pushMethod · 0.80
push_requestMethod · 0.80
pop_requestMethod · 0.80
sMethod · 0.45
popMethod · 0.45

Tested by

no test coverage detected