MCPcopy
hub / github.com/celery/celery / test_dump_reserved

Method test_dump_reserved

t/unit/worker/test_control.py:431–445  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

429 assert panel.handle('dump_schedule')
430
431 def test_dump_reserved(self):
432 consumer = Consumer(self.app)
433 req = Request(
434 self.TaskMessage(self.mytask.name, args=(2, 2)), app=self.app,
435 ) # ^ need to keep reference for reserved_tasks WeakSet.
436 worker_state.task_reserved(req)
437 try:
438 panel = self.create_panel(consumer=consumer)
439 response = panel.handle('dump_reserved', {'safe': True})
440 assert response[0]['name'] == self.mytask.name
441 assert response[0]['hostname'] == socket.gethostname()
442 worker_state.reserved_requests.clear()
443 assert not panel.handle('dump_reserved')
444 finally:
445 worker_state.reserved_requests.clear()
446
447 def test_rate_limit_invalid_rate_limit_string(self):
448 e = self.panel.handle('rate_limit', arguments={

Callers

nothing calls this directly

Calls 5

create_panelMethod · 0.95
RequestClass · 0.90
ConsumerClass · 0.70
handleMethod · 0.45
clearMethod · 0.45

Tested by

no test coverage detected