MCPcopy
hub / github.com/celery/celery / test_disable_events

Method test_disable_events

t/unit/worker/test_control.py:156–164  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

154 assert 'already enabled' in panel.handle('enable_events')['ok']
155
156 def test_disable_events(self):
157 consumer = Consumer(self.app)
158 panel = self.create_panel(consumer=consumer)
159 evd = consumer.event_dispatcher
160 evd.enabled = True
161 evd.groups = {'task'}
162 panel.handle('disable_events')
163 assert 'task' not in evd.groups
164 assert 'already disabled' in panel.handle('disable_events')['ok']
165
166 def test_clock(self):
167 consumer = Consumer(self.app)

Callers

nothing calls this directly

Calls 3

create_panelMethod · 0.95
ConsumerClass · 0.70
handleMethod · 0.45

Tested by

no test coverage detected