MCPcopy
hub / github.com/celery/celery / test_query_task

Method test_query_task

t/unit/worker/test_control.py:792–817  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

790 _import.assert_not_called()
791
792 def test_query_task(self):
793 consumer = Consumer(self.app)
794 consumer.controller = _WC(app=self.app)
795 consumer.controller.consumer = consumer
796 panel = self.create_panel(consumer=consumer)
797 panel.app = self.app
798 req1 = Request(
799 self.TaskMessage(self.mytask.name, args=(2, 2)),
800 app=self.app,
801 )
802 worker_state.task_reserved(req1)
803 try:
804 assert not panel.handle('query_task', {'ids': {'1daa'}})
805 ret = panel.handle('query_task', {'ids': {req1.id}})
806 assert req1.id in ret
807 assert ret[req1.id][0] == 'reserved'
808 worker_state.active_requests.add(req1)
809 try:
810 ret = panel.handle('query_task', {'ids': {req1.id}})
811 assert ret[req1.id][0] == 'active'
812 finally:
813 worker_state.active_requests.clear()
814 ret = panel.handle('query_task', {'ids': {req1.id}})
815 assert ret[req1.id][0] == 'reserved'
816 finally:
817 worker_state.reserved_requests.clear()
818
819 @patch('celery.Celery.backend', new=PropertyMock(name='backend'))
820 def test_revoke_backend_status_update(self):

Callers

nothing calls this directly

Calls 6

create_panelMethod · 0.95
RequestClass · 0.90
ConsumerClass · 0.70
handleMethod · 0.45
addMethod · 0.45
clearMethod · 0.45

Tested by

no test coverage detected