MCPcopy
hub / github.com/celery/celery / create_task_message

Method create_task_message

t/unit/worker/test_worker.py:72–76  ·  view source on GitHub ↗
(self, channel, *args, **kwargs)

Source from the content-addressed store, hash-verified

70class ConsumerCase:
71
72 def create_task_message(self, channel, *args, **kwargs):
73 m = self.TaskMessage(*args, **kwargs)
74 m.channel = channel
75 m.delivery_info = {'consumer_tag': 'mock'}
76 return m
77
78
79class test_Consumer(ConsumerCase):

Calls

no outgoing calls

Tested by

no test coverage detected