(self)
| 1280 | assert isinstance(meta['result'], KeyError) |
| 1281 | |
| 1282 | def test_execute_using_pool(self): |
| 1283 | tid = uuid() |
| 1284 | job = self.xRequest(id=tid, args=[4]) |
| 1285 | p = Mock() |
| 1286 | job.execute_using_pool(p) |
| 1287 | p.apply_async.assert_called_once() |
| 1288 | trace = p.apply_async.call_args[0][0] |
| 1289 | assert trace == trace_task_ret |
| 1290 | args = p.apply_async.call_args[1]['args'] |
| 1291 | assert args[0] == self.mytask.name |
| 1292 | assert args[1] == tid |
| 1293 | assert args[2] == job.request_dict |
| 1294 | assert args[3] == job.message.body |
| 1295 | |
| 1296 | def test_execute_using_pool_fast_trace_task(self): |
| 1297 | self.app.use_fast_trace_task = True |
nothing calls this directly
no test coverage detected