MCPcopy
hub / github.com/celery/celery / test_accept_content

Method test_accept_content

t/unit/app/test_amqp.py:14–22  ·  view source on GitHub ↗
(self, app)

Source from the content-addressed store, hash-verified

12class test_TaskConsumer:
13
14 def test_accept_content(self, app):
15 with app.pool.acquire(block=True) as con:
16 app.conf.accept_content = ['application/json']
17 assert app.amqp.TaskConsumer(con).accept == {
18 'application/json',
19 }
20 assert app.amqp.TaskConsumer(con, accept=['json']).accept == {
21 'application/json',
22 }
23
24
25class test_ProducerPool:

Callers

nothing calls this directly

Calls 2

acquireMethod · 0.80
TaskConsumerMethod · 0.80

Tested by

no test coverage detected