MCPcopy
hub / github.com/celery/celery / test_create_task_handler

Method test_create_task_handler

t/unit/worker/test_consumer.py:157–184  ·  view source on GitHub ↗
(self, subtests)

Source from the content-addressed store, hash-verified

155 assert c._restore_prefetch_count_after_connection_restart(None) is None
156
157 def test_create_task_handler(self, subtests):
158 c = self.get_consumer()
159 c.qos = MagicMock()
160 c.qos.value = 1
161 c._maximum_prefetch_restored = False
162
163 sig = self.add.s(2, 2)
164 message = self.task_message_from_sig(self.app, sig)
165
166 def raise_exception():
167 raise KeyError('Foo')
168
169 def strategy(_, __, ack_log_error_promise, ___, ____):
170 ack_log_error_promise()
171
172 c.strategies[sig.task] = strategy
173 c.call_soon = raise_exception
174 on_task_received = c.create_task_handler()
175 on_task_received(message)
176
177 with subtests.test("initial prefetch count is never 0"):
178 assert c.initial_prefetch_count != 0
179
180 with subtests.test("initial prefetch count is 2"):
181 assert c.initial_prefetch_count == 2
182
183 with subtests.test("maximum prefetch is reached"):
184 assert c._maximum_prefetch_restored is True
185
186 def test_flush_events(self):
187 c = self.get_consumer()

Callers

nothing calls this directly

Calls 4

create_task_handlerMethod · 0.80
testMethod · 0.80
get_consumerMethod · 0.45
sMethod · 0.45

Tested by

no test coverage detected