(self)
| 343 | c.loop(*c.loop_args()) |
| 344 | |
| 345 | def test_loop(self): |
| 346 | |
| 347 | class Connection(self.app.connection_for_read().__class__): |
| 348 | obj = None |
| 349 | |
| 350 | def drain_events(self, **kwargs): |
| 351 | self.obj.connection = None |
| 352 | |
| 353 | @property |
| 354 | def supports_heartbeats(self): |
| 355 | return False |
| 356 | |
| 357 | c = self.LoopConsumer() |
| 358 | c.blueprint.state = RUN |
| 359 | c.connection = Connection(self.app.conf.broker_url) |
| 360 | c.connection.obj = c |
| 361 | c.connection.get_heartbeat_interval = Mock(return_value=None) |
| 362 | c.qos = QoS(c.task_consumer.qos, 10) |
| 363 | |
| 364 | c.loop(*c.loop_args()) |
| 365 | c.loop(*c.loop_args()) |
| 366 | assert c.task_consumer.consume.call_count |
| 367 | c.task_consumer.qos.assert_called_with(prefetch_count=10) |
| 368 | assert c.qos.value == 10 |
| 369 | c.qos.decrement_eventually() |
| 370 | assert c.qos.value == 9 |
| 371 | c.qos.update() |
| 372 | assert c.qos.value == 9 |
| 373 | c.task_consumer.qos.assert_called_with(prefetch_count=9) |
| 374 | |
| 375 | def test_ignore_errors(self): |
| 376 | c = self.NoopConsumer() |
nothing calls this directly
no test coverage detected