MCPcopy
hub / github.com/celery/celery / test_pdeath_sig

Method test_pdeath_sig

t/unit/concurrency/test_prefork.py:104–113  ·  view source on GitHub ↗
(self, _set_pdeathsig, set_mp_process_title, restore_logging)

Source from the content-addressed store, hash-verified

102
103 @patch('celery.platforms.set_pdeathsig')
104 def test_pdeath_sig(self, _set_pdeathsig, set_mp_process_title, restore_logging):
105 from celery import signals
106 on_worker_process_init = Mock()
107 signals.worker_process_init.connect(on_worker_process_init)
108 from celery.concurrency.prefork import process_initializer
109
110 with self.Celery(loader=self.Loader) as app:
111 app.conf = AttributeDict(DEFAULTS)
112 process_initializer(app, 'awesome.worker.com')
113 _set_pdeathsig.assert_called_once_with('SIGKILL')
114
115
116class test_process_destructor:

Callers

nothing calls this directly

Calls 3

AttributeDictClass · 0.90
process_initializerFunction · 0.90
connectMethod · 0.45

Tested by

no test coverage detected