(self, _set_pdeathsig, set_mp_process_title, restore_logging)
| 102 | |
| 103 | @patch('celery.platforms.set_pdeathsig') |
| 104 | def test_pdeath_sig(self, _set_pdeathsig, set_mp_process_title, restore_logging): |
| 105 | from celery import signals |
| 106 | on_worker_process_init = Mock() |
| 107 | signals.worker_process_init.connect(on_worker_process_init) |
| 108 | from celery.concurrency.prefork import process_initializer |
| 109 | |
| 110 | with self.Celery(loader=self.Loader) as app: |
| 111 | app.conf = AttributeDict(DEFAULTS) |
| 112 | process_initializer(app, 'awesome.worker.com') |
| 113 | _set_pdeathsig.assert_called_once_with('SIGKILL') |
| 114 | |
| 115 | |
| 116 | class test_process_destructor: |
nothing calls this directly
no test coverage detected