MCPcopy
hub / github.com/celery/celery / test_execute_fail

Method test_execute_fail

t/unit/worker/test_request.py:1268–1280  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1266 assert meta['status'] == states.SUCCESS
1267
1268 def test_execute_fail(self):
1269 tid = uuid()
1270 job = self.xRequest(
1271 name=self.mytask_raising.name,
1272 id=tid,
1273 args=[4],
1274 kwargs={},
1275 )
1276 assert isinstance(job.execute(), ExceptionInfo)
1277 assert self.mytask_raising.backend.serializer == 'pickle'
1278 meta = self.mytask_raising.backend.get_task_meta(tid)
1279 assert meta['status'] == states.FAILURE
1280 assert isinstance(meta['result'], KeyError)
1281
1282 def test_execute_using_pool(self):
1283 tid = uuid()

Callers

nothing calls this directly

Calls 3

xRequestMethod · 0.80
executeMethod · 0.45
get_task_metaMethod · 0.45

Tested by

no test coverage detected