MCPcopy
hub / github.com/celery/celery / assert_ids

Method assert_ids

t/integration/test_canvas.py:314–326  ·  view source on GitHub ↗
(self, res, size)

Source from the content-addressed store, hash-verified

312 self.assert_ids(res, num - 1)
313
314 def assert_ids(self, res, size):
315 i, root = size, res
316 while root.parent:
317 root = root.parent
318 node = res
319 while node:
320 root_id, parent_id, value = node.get(timeout=30)
321 assert value == i
322 if node.parent:
323 assert parent_id == node.parent.id
324 assert root_id == root.id
325 node = node.parent
326 i -= 1
327
328 def test_chord_soft_timeout_recuperation(self, manager):
329 """Test that if soft timeout happens in task but is managed by task,

Callers 1

test_parent_idsMethod · 0.95

Calls 1

getMethod · 0.45

Tested by

no test coverage detected