MCPcopy
hub / github.com/celery/celery / test_apply

Method test_apply

t/unit/tasks/test_chord.py:281–299  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

279 assert result.get() == sum(i + i for i in range(10))
280
281 def test_apply(self):
282 self.app.conf.task_always_eager = False
283 from celery import chord
284
285 m = Mock()
286 m.app.conf.task_always_eager = False
287 m.AsyncResult = AsyncResult
288 prev, chord.run = chord.run, m
289 try:
290 x = chord(self.add.s(i, i) for i in range(10))
291 body = self.add.s(2)
292 result = x(body)
293 assert result.id
294 # does not modify original signature
295 with pytest.raises(KeyError):
296 body.options['task_id']
297 chord.run.assert_called()
298 finally:
299 chord.run = prev
300
301 def test_init(self):
302 from celery import chord

Callers

nothing calls this directly

Calls 4

chordFunction · 0.90
xFunction · 0.85
sMethod · 0.45
raisesMethod · 0.45

Tested by

no test coverage detected