MCPcopy
hub / github.com/celery/celery / test_loop

Method test_loop

t/unit/worker/test_worker.py:345–373  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

343 c.loop(*c.loop_args())
344
345 def test_loop(self):
346
347 class Connection(self.app.connection_for_read().__class__):
348 obj = None
349
350 def drain_events(self, **kwargs):
351 self.obj.connection = None
352
353 @property
354 def supports_heartbeats(self):
355 return False
356
357 c = self.LoopConsumer()
358 c.blueprint.state = RUN
359 c.connection = Connection(self.app.conf.broker_url)
360 c.connection.obj = c
361 c.connection.get_heartbeat_interval = Mock(return_value=None)
362 c.qos = QoS(c.task_consumer.qos, 10)
363
364 c.loop(*c.loop_args())
365 c.loop(*c.loop_args())
366 assert c.task_consumer.consume.call_count
367 c.task_consumer.qos.assert_called_with(prefetch_count=10)
368 assert c.qos.value == 10
369 c.qos.decrement_eventually()
370 assert c.qos.value == 9
371 c.qos.update()
372 assert c.qos.value == 9
373 c.task_consumer.qos.assert_called_with(prefetch_count=9)
374
375 def test_ignore_errors(self):
376 c = self.NoopConsumer()

Callers

nothing calls this directly

Calls 5

LoopConsumerMethod · 0.95
loopMethod · 0.80
loop_argsMethod · 0.80
ConnectionClass · 0.70
updateMethod · 0.45

Tested by

no test coverage detected