MCPcopy
hub / github.com/celery/celery / test_setup_nolimit

Method test_setup_nolimit

t/unit/app/test_amqp.py:27–43  ·  view source on GitHub ↗
(self, app)

Source from the content-addressed store, hash-verified

25class test_ProducerPool:
26
27 def test_setup_nolimit(self, app):
28 app.conf.broker_pool_limit = None
29 try:
30 delattr(app, '_pool')
31 except AttributeError:
32 pass
33 app.amqp._producer_pool = None
34 pool = app.amqp.producer_pool
35 assert pool.limit == app.pool.limit
36 assert not pool._resource.queue
37
38 r1 = pool.acquire()
39 r2 = pool.acquire()
40 r1.release()
41 r2.release()
42 r1 = pool.acquire()
43 r2 = pool.acquire()
44
45 def test_setup(self, app):
46 app.conf.broker_pool_limit = 2

Callers

nothing calls this directly

Calls 2

acquireMethod · 0.80
releaseMethod · 0.80

Tested by

no test coverage detected