(*args)
| 299 | verbose (bool, optional): Whether to print the type of shutdown. Defaults to True. |
| 300 | """ |
| 301 | def _handle_request(*args): |
| 302 | with in_sighandler(): |
| 303 | from celery.worker import state |
| 304 | if current_process()._name == 'MainProcess': |
| 305 | if callback: |
| 306 | callback(worker) |
| 307 | if verbose: |
| 308 | safe_say(f'worker: {how} shutdown (MainProcess)', sys.__stdout__) |
| 309 | signals.worker_shutting_down.send( |
| 310 | sender=worker.hostname, sig=sig, how=how, |
| 311 | exitcode=exitcode, |
| 312 | ) |
| 313 | setattr(state, {'Warm': 'should_stop', |
| 314 | 'Cold': 'should_terminate'}[how], exitcode) |
| 315 | _handle_request.__name__ = str(f'worker_{how}') |
| 316 | platforms.signals[sig] = _handle_request |
| 317 |
nothing calls this directly
no test coverage detected