MCPcopy
hub / github.com/celery/celery / test_update_state

Method test_update_state

t/unit/tasks/test_tasks.py:1299–1320  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1297 self.mytask.pop_request()
1298
1299 def test_update_state(self):
1300
1301 @self.app.task(shared=False)
1302 def yyy():
1303 pass
1304
1305 yyy.push_request()
1306 try:
1307 tid = uuid()
1308 # update_state should accept arbitrary kwargs, which are passed to
1309 # the backend store_result method
1310 yyy.update_state(tid, 'FROBULATING', {'fooz': 'baaz'},
1311 arbitrary_kwarg=None)
1312 assert yyy.AsyncResult(tid).status == 'FROBULATING'
1313 assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
1314
1315 yyy.request.id = tid
1316 yyy.update_state(state='FROBUZATING', meta={'fooz': 'baaz'})
1317 assert yyy.AsyncResult(tid).status == 'FROBUZATING'
1318 assert yyy.AsyncResult(tid).result == {'fooz': 'baaz'}
1319 finally:
1320 yyy.pop_request()
1321
1322 def test_update_state_passes_request_to_backend(self):
1323 backend = Mock()

Callers

nothing calls this directly

Calls 4

push_requestMethod · 0.80
pop_requestMethod · 0.80
update_stateMethod · 0.45
AsyncResultMethod · 0.45

Tested by

no test coverage detected