(self)
| 365 | c.start() |
| 366 | |
| 367 | def test_collects_at_restart(self): |
| 368 | c = self.get_consumer() |
| 369 | c.connection.collect.side_effect = MemoryError() |
| 370 | c.blueprint.start.side_effect = socket.error() |
| 371 | c.blueprint.restart.side_effect = self._closer(c) |
| 372 | c.start() |
| 373 | c.connection.collect.assert_called_with() |
| 374 | |
| 375 | def test_register_with_event_loop(self): |
| 376 | c = self.get_consumer() |
nothing calls this directly
no test coverage detected