MCPcopy
hub / github.com/celery/celery / test_on_task_call

Method test_on_task_call

t/unit/backends/test_rpc.py:251–258  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

249 assert self.b._create_binding('id') == self.b.binding
250
251 def test_on_task_call(self):
252 with patch('celery.backends.rpc.maybe_declare') as md:
253 with self.app.amqp.producer_pool.acquire() as prod:
254 self.b.on_task_call(prod, 'task_id'),
255 md.assert_called_with(
256 self.b.binding(prod.channel),
257 retry=True,
258 )
259
260 def test_create_exchange(self):
261 ex = self.b._create_exchange('name')

Callers

nothing calls this directly

Calls 3

acquireMethod · 0.80
bindingMethod · 0.80
on_task_callMethod · 0.45

Tested by

no test coverage detected