MCPcopy
hub / github.com/celery/celery / test_group_prepared

Method test_group_prepared

t/unit/tasks/test_canvas.py:1266–1277  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1264 mock_set_chord_size.assert_has_calls((call(ANY, 1),) * child_count)
1265
1266 def test_group_prepared(self):
1267 # Using both partial and dict based signatures
1268 sig = group(dict(self.add.s(0)), self.add.s(0))
1269 _, group_id, root_id = sig._freeze_gid({})
1270 tasks = sig._prepared(sig.tasks, [42], group_id, root_id, self.app)
1271
1272 for task, result, group_id in tasks:
1273 assert isinstance(task, Signature)
1274 assert task.args[0] == 42
1275 assert task.args[1] == 0
1276 assert isinstance(result, AsyncResult)
1277 assert group_id is not None
1278
1279 def test_task_replace_with_group_preserves_group_order(self):
1280 self.app.conf.task_always_eager = True

Callers

nothing calls this directly

Calls 4

groupClass · 0.90
_freeze_gidMethod · 0.80
_preparedMethod · 0.80
sMethod · 0.45

Tested by

no test coverage detected