(self)
| 323 | assert res.get() is None |
| 324 | |
| 325 | def test_get(self): |
| 326 | ok_res = self.app.AsyncResult(self.task1['id']) |
| 327 | ok2_res = self.app.AsyncResult(self.task2['id']) |
| 328 | nok_res = self.app.AsyncResult(self.task3['id']) |
| 329 | nok2_res = self.app.AsyncResult(self.task4['id']) |
| 330 | none_res = self.app.AsyncResult(self.task6['id']) |
| 331 | |
| 332 | callback = Mock(name='callback') |
| 333 | |
| 334 | assert ok_res.get(callback=callback) == 'the' |
| 335 | callback.assert_called_with(ok_res.id, 'the') |
| 336 | assert ok2_res.get() == 'quick' |
| 337 | with pytest.raises(KeyError): |
| 338 | nok_res.get() |
| 339 | assert nok_res.get(propagate=False) |
| 340 | assert isinstance(nok2_res.result, KeyError) |
| 341 | assert ok_res.info == 'the' |
| 342 | assert none_res.get() is None |
| 343 | assert none_res.state == states.SUCCESS |
| 344 | |
| 345 | def test_get_when_ignored(self): |
| 346 | result = self.app.AsyncResult(uuid()) |
nothing calls this directly
no test coverage detected