MCPcopy
hub / github.com/celery/celery / test_send_event_exchange_string

Method test_send_event_exchange_string

t/unit/app/test_amqp.py:317–327  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

315 assert pub['exchange'] == ''
316
317 def test_send_event_exchange_string(self):
318 evd = Mock(name='evd')
319 self.app.amqp.send_task_message(
320 Mock(), 'foo', self.simple_message, retry=False,
321 exchange='xyz', routing_key='xyb',
322 event_dispatcher=evd,
323 )
324 evd.publish.assert_called()
325 event = evd.publish.call_args[0][1]
326 assert event['routing_key'] == 'xyb'
327 assert event['exchange'] == 'xyz'
328
329 def test_send_task_message__with_delivery_mode(self):
330 prod = Mock(name='producer')

Callers

nothing calls this directly

Calls 1

send_task_messageMethod · 0.80

Tested by

no test coverage detected