(self)
| 1266 | assert meta['status'] == states.SUCCESS |
| 1267 | |
| 1268 | def test_execute_fail(self): |
| 1269 | tid = uuid() |
| 1270 | job = self.xRequest( |
| 1271 | name=self.mytask_raising.name, |
| 1272 | id=tid, |
| 1273 | args=[4], |
| 1274 | kwargs={}, |
| 1275 | ) |
| 1276 | assert isinstance(job.execute(), ExceptionInfo) |
| 1277 | assert self.mytask_raising.backend.serializer == 'pickle' |
| 1278 | meta = self.mytask_raising.backend.get_task_meta(tid) |
| 1279 | assert meta['status'] == states.FAILURE |
| 1280 | assert isinstance(meta['result'], KeyError) |
| 1281 | |
| 1282 | def test_execute_using_pool(self): |
| 1283 | tid = uuid() |
nothing calls this directly
no test coverage detected