MCPcopy
hub / github.com/celery/celery / test_dump_schedule

Method test_dump_schedule

t/unit/worker/test_control.py:415–429  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

413 worker_state.revoked.clear()
414
415 def test_dump_schedule(self):
416 consumer = Consumer(self.app)
417 panel = self.create_panel(consumer=consumer)
418 assert not panel.handle('dump_schedule')
419 r = Request(
420 self.TaskMessage(self.mytask.name, 'CAFEBABE'),
421 app=self.app,
422 )
423 consumer.timer.schedule.enter_at(
424 consumer.timer.Entry(lambda x: x, (r,)),
425 datetime.now() + timedelta(seconds=10))
426 consumer.timer.schedule.enter_at(
427 consumer.timer.Entry(lambda x: x, (object(),)),
428 datetime.now() + timedelta(seconds=10))
429 assert panel.handle('dump_schedule')
430
431 def test_dump_reserved(self):
432 consumer = Consumer(self.app)

Callers

nothing calls this directly

Calls 5

create_panelMethod · 0.95
RequestClass · 0.90
ConsumerClass · 0.70
handleMethod · 0.45
nowMethod · 0.45

Tested by

no test coverage detected