(self)
| 249 | assert self.b._create_binding('id') == self.b.binding |
| 250 | |
| 251 | def test_on_task_call(self): |
| 252 | with patch('celery.backends.rpc.maybe_declare') as md: |
| 253 | with self.app.amqp.producer_pool.acquire() as prod: |
| 254 | self.b.on_task_call(prod, 'task_id'), |
| 255 | md.assert_called_with( |
| 256 | self.b.binding(prod.channel), |
| 257 | retry=True, |
| 258 | ) |
| 259 | |
| 260 | def test_create_exchange(self): |
| 261 | ex = self.b._create_exchange('name') |
nothing calls this directly
no test coverage detected