(self)
| 1207 | Request(m, app=self.app) |
| 1208 | |
| 1209 | def test_execute(self): |
| 1210 | tid = uuid() |
| 1211 | job = self.xRequest(id=tid, args=[4], kwargs={}) |
| 1212 | assert job.execute() == 256 |
| 1213 | meta = self.mytask.backend.get_task_meta(tid) |
| 1214 | assert meta['status'] == states.SUCCESS |
| 1215 | assert meta['result'] == 256 |
| 1216 | |
| 1217 | def test_execute_backend_error_acks_late(self): |
| 1218 | """direct call to execute should reject task in case of internal failure.""" |
nothing calls this directly
no test coverage detected