MCPcopy
hub / github.com/celery/celery / test_parent_ids

Method test_parent_ids

t/integration/test_canvas.py:299–312  ·  t/integration/test_canvas.py::test_chain.test_parent_ids
(self, manager, num=10)

Source from the content-addressed store, hash-verified

297
298 @flaky
299 def test_parent_ids(self, manager, num=10):
300 assert_ping(manager)
301
302 c = chain(ids.si(i=i) for i in range(num))
303 c.freeze()
304 res = c()
305 try:
306 res.get(timeout=TIMEOUT)
307 except TimeoutError:
308 print(manager.inspect().active())
309 print(manager.inspect().reserved())
310 print(manager.inspect().stats())
311 raise
312 self.assert_ids(res, num - 1)
313
314 def assert_ids(self, res, size):
315 i, root = size, res

Callers

nothing calls this directly

Calls 10

assert_idsMethod · 0.95
chainClass · 0.90
assert_pingFunction · 0.85
siMethod · 0.80
reservedMethod · 0.80
freezeMethod · 0.45
getMethod · 0.45
activeMethod · 0.45
inspectMethod · 0.45
statsMethod · 0.45

Tested by

no test coverage detected