(self)
| 315 | assert pub['exchange'] == '' |
| 316 | |
| 317 | def test_send_event_exchange_string(self): |
| 318 | evd = Mock(name='evd') |
| 319 | self.app.amqp.send_task_message( |
| 320 | Mock(), 'foo', self.simple_message, retry=False, |
| 321 | exchange='xyz', routing_key='xyb', |
| 322 | event_dispatcher=evd, |
| 323 | ) |
| 324 | evd.publish.assert_called() |
| 325 | event = evd.publish.call_args[0][1] |
| 326 | assert event['routing_key'] == 'xyb' |
| 327 | assert event['exchange'] == 'xyz' |
| 328 | |
| 329 | def test_send_task_message__with_delivery_mode(self): |
| 330 | prod = Mock(name='producer') |
nothing calls this directly
no test coverage detected