MCPcopy
hub / github.com/celery/celery / test_flush_events

Method test_flush_events

t/unit/worker/test_consumer.py:186–192  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

184 assert c._maximum_prefetch_restored is True
185
186 def test_flush_events(self):
187 c = self.get_consumer()
188 c.event_dispatcher = None
189 c._flush_events()
190 c.event_dispatcher = Mock(name='evd')
191 c._flush_events()
192 c.event_dispatcher.flush.assert_called_with()
193
194 def test_on_send_event_buffered(self):
195 c = self.get_consumer()

Callers

nothing calls this directly

Calls 2

_flush_eventsMethod · 0.80
get_consumerMethod · 0.45

Tested by

no test coverage detected