MCPcopy
hub / github.com/celery/celery / test_receieve_message

Method test_receieve_message

t/unit/worker/test_worker.py:264–279  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

262 return c.task_consumer.on_message
263
264 def test_receieve_message(self):
265 c = self.LoopConsumer()
266 c.blueprint.state = RUN
267 m = self.create_task_message(
268 Mock(), self.foo_task.name,
269 args=[2, 4, 8], kwargs={},
270 )
271 c.update_strategies()
272 callback = self._get_on_message(c)
273 callback(m)
274
275 in_bucket = self.buffer.get_nowait()
276 assert isinstance(in_bucket, Request)
277 assert in_bucket.name == self.foo_task.name
278 assert in_bucket.execute() == 2 * 4 * 8
279 assert self.timer.empty()
280
281 def test_start_channel_error(self):
282 def loop_side_effect():

Callers

nothing calls this directly

Calls 7

LoopConsumerMethod · 0.95
_get_on_messageMethod · 0.95
callbackFunction · 0.85
update_strategiesMethod · 0.80
emptyMethod · 0.80
create_task_messageMethod · 0.45
executeMethod · 0.45

Tested by

no test coverage detected