MCPcopy
hub / github.com/celery/celery / test_get

Method test_get

t/unit/tasks/test_result.py:325–343  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

323 assert res.get() is None
324
325 def test_get(self):
326 ok_res = self.app.AsyncResult(self.task1['id'])
327 ok2_res = self.app.AsyncResult(self.task2['id'])
328 nok_res = self.app.AsyncResult(self.task3['id'])
329 nok2_res = self.app.AsyncResult(self.task4['id'])
330 none_res = self.app.AsyncResult(self.task6['id'])
331
332 callback = Mock(name='callback')
333
334 assert ok_res.get(callback=callback) == 'the'
335 callback.assert_called_with(ok_res.id, 'the')
336 assert ok2_res.get() == 'quick'
337 with pytest.raises(KeyError):
338 nok_res.get()
339 assert nok_res.get(propagate=False)
340 assert isinstance(nok2_res.result, KeyError)
341 assert ok_res.info == 'the'
342 assert none_res.get() is None
343 assert none_res.state == states.SUCCESS
344
345 def test_get_when_ignored(self):
346 result = self.app.AsyncResult(uuid())

Callers

nothing calls this directly

Calls 3

getMethod · 0.95
AsyncResultMethod · 0.45
raisesMethod · 0.45

Tested by

no test coverage detected