MCPcopy
hub / github.com/celery/celery / test_on_after_fork

Method test_on_after_fork

t/unit/backends/test_redis.py:212–234  ·  view source on GitHub ↗
(self, parent_method)

Source from the content-addressed store, hash-verified

210
211 @patch('celery.backends.asynchronous.BaseResultConsumer.on_after_fork')
212 def test_on_after_fork(self, parent_method):
213 consumer = self.get_consumer()
214 consumer.start('none')
215 consumer.on_after_fork()
216 parent_method.assert_called_once()
217 consumer.backend.client.connection_pool.reset.assert_called_once()
218 consumer._pubsub.close.assert_called_once()
219 # PubSub instance not initialized - exception would be raised
220 # when calling .close()
221 consumer._pubsub = None
222 parent_method.reset_mock()
223 consumer.backend.client.connection_pool.reset.reset_mock()
224 consumer.on_after_fork()
225 parent_method.assert_called_once()
226 consumer.backend.client.connection_pool.reset.assert_called_once()
227
228 # Continues on KeyError
229 consumer._pubsub = Mock()
230 consumer._pubsub.close = Mock(side_effect=KeyError)
231 parent_method.reset_mock()
232 consumer.backend.client.connection_pool.reset.reset_mock()
233 consumer.on_after_fork()
234 parent_method.assert_called_once()
235
236 @patch('celery.backends.redis.ResultConsumer.cancel_for')
237 @patch('celery.backends.asynchronous.BaseResultConsumer.on_state_change')

Callers

nothing calls this directly

Calls 3

get_consumerMethod · 0.95
startMethod · 0.45
on_after_forkMethod · 0.45

Tested by

no test coverage detected