(self)
| 264 | task_reserved.assert_called_with(bucket) |
| 265 | |
| 266 | def test_max_restarts_exceeded(self): |
| 267 | c = self.get_consumer() |
| 268 | |
| 269 | def se(*args, **kwargs): |
| 270 | c.blueprint.state = CLOSE |
| 271 | raise RestartFreqExceeded() |
| 272 | |
| 273 | c._restart_state.step.side_effect = se |
| 274 | c.blueprint.start.side_effect = socket.error() |
| 275 | |
| 276 | with patch('celery.worker.consumer.consumer.sleep') as sleep: |
| 277 | c.start() |
| 278 | sleep.assert_called_with(1) |
| 279 | |
| 280 | def test_do_not_restart_when_closed(self): |
| 281 | c = self.get_consumer() |
nothing calls this directly
no test coverage detected