(self)
| 413 | worker_state.revoked.clear() |
| 414 | |
| 415 | def test_dump_schedule(self): |
| 416 | consumer = Consumer(self.app) |
| 417 | panel = self.create_panel(consumer=consumer) |
| 418 | assert not panel.handle('dump_schedule') |
| 419 | r = Request( |
| 420 | self.TaskMessage(self.mytask.name, 'CAFEBABE'), |
| 421 | app=self.app, |
| 422 | ) |
| 423 | consumer.timer.schedule.enter_at( |
| 424 | consumer.timer.Entry(lambda x: x, (r,)), |
| 425 | datetime.now() + timedelta(seconds=10)) |
| 426 | consumer.timer.schedule.enter_at( |
| 427 | consumer.timer.Entry(lambda x: x, (object(),)), |
| 428 | datetime.now() + timedelta(seconds=10)) |
| 429 | assert panel.handle('dump_schedule') |
| 430 | |
| 431 | def test_dump_reserved(self): |
| 432 | consumer = Consumer(self.app) |
nothing calls this directly
no test coverage detected