(self, subtests)
| 155 | assert c._restore_prefetch_count_after_connection_restart(None) is None |
| 156 | |
| 157 | def test_create_task_handler(self, subtests): |
| 158 | c = self.get_consumer() |
| 159 | c.qos = MagicMock() |
| 160 | c.qos.value = 1 |
| 161 | c._maximum_prefetch_restored = False |
| 162 | |
| 163 | sig = self.add.s(2, 2) |
| 164 | message = self.task_message_from_sig(self.app, sig) |
| 165 | |
| 166 | def raise_exception(): |
| 167 | raise KeyError('Foo') |
| 168 | |
| 169 | def strategy(_, __, ack_log_error_promise, ___, ____): |
| 170 | ack_log_error_promise() |
| 171 | |
| 172 | c.strategies[sig.task] = strategy |
| 173 | c.call_soon = raise_exception |
| 174 | on_task_received = c.create_task_handler() |
| 175 | on_task_received(message) |
| 176 | |
| 177 | with subtests.test("initial prefetch count is never 0"): |
| 178 | assert c.initial_prefetch_count != 0 |
| 179 | |
| 180 | with subtests.test("initial prefetch count is 2"): |
| 181 | assert c.initial_prefetch_count == 2 |
| 182 | |
| 183 | with subtests.test("maximum prefetch is reached"): |
| 184 | assert c._maximum_prefetch_restored is True |
| 185 | |
| 186 | def test_flush_events(self): |
| 187 | c = self.get_consumer() |
nothing calls this directly
no test coverage detected