(self, parent_method)
| 210 | |
| 211 | @patch('celery.backends.asynchronous.BaseResultConsumer.on_after_fork') |
| 212 | def test_on_after_fork(self, parent_method): |
| 213 | consumer = self.get_consumer() |
| 214 | consumer.start('none') |
| 215 | consumer.on_after_fork() |
| 216 | parent_method.assert_called_once() |
| 217 | consumer.backend.client.connection_pool.reset.assert_called_once() |
| 218 | consumer._pubsub.close.assert_called_once() |
| 219 | # PubSub instance not initialized - exception would be raised |
| 220 | # when calling .close() |
| 221 | consumer._pubsub = None |
| 222 | parent_method.reset_mock() |
| 223 | consumer.backend.client.connection_pool.reset.reset_mock() |
| 224 | consumer.on_after_fork() |
| 225 | parent_method.assert_called_once() |
| 226 | consumer.backend.client.connection_pool.reset.assert_called_once() |
| 227 | |
| 228 | # Continues on KeyError |
| 229 | consumer._pubsub = Mock() |
| 230 | consumer._pubsub.close = Mock(side_effect=KeyError) |
| 231 | parent_method.reset_mock() |
| 232 | consumer.backend.client.connection_pool.reset.reset_mock() |
| 233 | consumer.on_after_fork() |
| 234 | parent_method.assert_called_once() |
| 235 | |
| 236 | @patch('celery.backends.redis.ResultConsumer.cancel_for') |
| 237 | @patch('celery.backends.asynchronous.BaseResultConsumer.on_state_change') |
nothing calls this directly
no test coverage detected