Pool child process initializer. Initialize the child pool process to ensure the correct app instance is used and things like logging works.
(app, hostname)
| 39 | |
| 40 | |
| 41 | def process_initializer(app, hostname): |
| 42 | """Pool child process initializer. |
| 43 | |
| 44 | Initialize the child pool process to ensure the correct |
| 45 | app instance is used and things like logging works. |
| 46 | """ |
| 47 | # Each running worker gets SIGKILL by OS when main process exits. |
| 48 | platforms.set_pdeathsig('SIGKILL') |
| 49 | _set_task_join_will_block(True) |
| 50 | platforms.signals.reset(*WORKER_SIGRESET) |
| 51 | platforms.signals.ignore(*WORKER_SIGIGNORE) |
| 52 | platforms.set_mp_process_title('celeryd', hostname=hostname) |
| 53 | # This is for Windows and other platforms not supporting |
| 54 | # fork(). Note that init_worker makes sure it's only |
| 55 | # run once per process. |
| 56 | app.loader.init_worker() |
| 57 | app.loader.init_worker_process() |
| 58 | logfile = os.environ.get('CELERY_LOG_FILE') or None |
| 59 | if logfile and '%i' in logfile.lower(): |
| 60 | # logfile path will differ so need to set up logging again. |
| 61 | app.log.already_setup = False |
| 62 | app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0) or 0), |
| 63 | logfile, |
| 64 | bool(os.environ.get('CELERY_LOG_REDIRECT', False)), |
| 65 | str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')), |
| 66 | hostname=hostname) |
| 67 | if os.environ.get('FORKED_BY_MULTIPROCESSING'): |
| 68 | # pool did execv after fork |
| 69 | trace.setup_worker_optimizations(app, hostname) |
| 70 | else: |
| 71 | app.set_current() |
| 72 | set_default_app(app) |
| 73 | app.finalize() |
| 74 | trace._tasks = app._tasks # enables fast_trace_task optimization. |
| 75 | # rebuild execution handler for all tasks. |
| 76 | from celery.app.trace import build_tracer |
| 77 | for name, task in app.tasks.items(): |
| 78 | task.__trace__ = build_tracer(name, task, app.loader, hostname, |
| 79 | app=app) |
| 80 | from celery.worker import state as worker_state |
| 81 | worker_state.reset_state() |
| 82 | signals.worker_process_init.send(sender=None) |
| 83 | |
| 84 | |
| 85 | def process_destructor(pid, exitcode): |