MCPcopy
hub / github.com/celery/celery / test_execute_ack

Method test_execute_ack

t/unit/worker/test_request.py:1254–1266  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1252 assert meta['status'] == states.SUCCESS
1253
1254 def test_execute_ack(self):
1255 scratch = {'ACK': False}
1256
1257 def on_ack(*args, **kwargs):
1258 scratch['ACK'] = True
1259
1260 tid = uuid()
1261 job = self.xRequest(id=tid, args=[4], on_ack=on_ack)
1262 assert job.execute() == 256
1263 meta = self.mytask.backend.get_task_meta(tid)
1264 assert scratch['ACK']
1265 assert meta['result'] == 256
1266 assert meta['status'] == states.SUCCESS
1267
1268 def test_execute_fail(self):
1269 tid = uuid()

Callers

nothing calls this directly

Calls 3

xRequestMethod · 0.80
executeMethod · 0.45
get_task_metaMethod · 0.45

Tested by

no test coverage detected