(self)
| 261 | assert prod.publish.call_args[1]['foo'] == 1 |
| 262 | |
| 263 | def test_send_task_message__headers(self): |
| 264 | prod = Mock(name='producer') |
| 265 | self.app.amqp.send_task_message( |
| 266 | prod, 'foo', self.simple_message_no_sent_event, |
| 267 | headers={'x1x': 'y2x'}, |
| 268 | retry=False, |
| 269 | ) |
| 270 | assert prod.publish.call_args[1]['headers']['x1x'] == 'y2x' |
| 271 | |
| 272 | def test_send_task_message__queue_string(self): |
| 273 | prod = Mock(name='producer') |
nothing calls this directly
no test coverage detected