MCPcopy
hub / github.com/celery/celery / test_send_task_message__headers

Method test_send_task_message__headers

t/unit/app/test_amqp.py:263–270  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

261 assert prod.publish.call_args[1]['foo'] == 1
262
263 def test_send_task_message__headers(self):
264 prod = Mock(name='producer')
265 self.app.amqp.send_task_message(
266 prod, 'foo', self.simple_message_no_sent_event,
267 headers={'x1x': 'y2x'},
268 retry=False,
269 )
270 assert prod.publish.call_args[1]['headers']['x1x'] == 'y2x'
271
272 def test_send_task_message__queue_string(self):
273 prod = Mock(name='producer')

Callers

nothing calls this directly

Calls 1

send_task_messageMethod · 0.80

Tested by

no test coverage detected