MCPcopy
hub / github.com/celery/celery / test_receive_message_unknown

Method test_receive_message_unknown

t/unit/worker/test_worker.py:190–199  ·  view source on GitHub ↗
(self, warn)

Source from the content-addressed store, hash-verified

188
189 @patch('celery.worker.consumer.consumer.warn')
190 def test_receive_message_unknown(self, warn):
191 c = self.LoopConsumer()
192 c.blueprint.state = RUN
193 c.steps.pop()
194 channel = Mock(name='.channeol')
195 m = create_message(channel, unknown={'baz': '!!!'})
196
197 callback = self._get_on_message(c)
198 callback(m)
199 warn.assert_called()
200
201 @patch('celery.worker.strategy.to_timestamp')
202 def test_receive_message_eta_OverflowError(self, to_timestamp):

Callers

nothing calls this directly

Calls 5

LoopConsumerMethod · 0.95
_get_on_messageMethod · 0.95
create_messageFunction · 0.85
callbackFunction · 0.85
popMethod · 0.45

Tested by

no test coverage detected