MCPcopy
hub / github.com/celery/celery / test_on_decode_error

Method test_on_decode_error

t/unit/worker/test_worker.py:235–246  ·  view source on GitHub ↗
(self, crit)

Source from the content-addressed store, hash-verified

233
234 @patch('celery.worker.consumer.consumer.crit')
235 def test_on_decode_error(self, crit):
236 c = self.LoopConsumer()
237
238 class MockMessage(Mock):
239 content_type = 'application/x-msgpack'
240 content_encoding = 'binary'
241 body = 'foobarbaz'
242
243 message = MockMessage()
244 c.on_decode_error(message, KeyError('foo'))
245 assert message.ack.call_count
246 assert "Can't decode message body" in crit.call_args[0][0]
247
248 def _get_on_message(self, c):
249 if c.qos is None:

Callers

nothing calls this directly

Calls 3

LoopConsumerMethod · 0.95
MockMessageClass · 0.85
on_decode_errorMethod · 0.80

Tested by

no test coverage detected