(self, crit)
| 233 | |
| 234 | @patch('celery.worker.consumer.consumer.crit') |
| 235 | def test_on_decode_error(self, crit): |
| 236 | c = self.LoopConsumer() |
| 237 | |
| 238 | class MockMessage(Mock): |
| 239 | content_type = 'application/x-msgpack' |
| 240 | content_encoding = 'binary' |
| 241 | body = 'foobarbaz' |
| 242 | |
| 243 | message = MockMessage() |
| 244 | c.on_decode_error(message, KeyError('foo')) |
| 245 | assert message.ack.call_count |
| 246 | assert "Can't decode message body" in crit.call_args[0][0] |
| 247 | |
| 248 | def _get_on_message(self, c): |
| 249 | if c.qos is None: |
nothing calls this directly
no test coverage detected