MCPcopy
hub / github.com/celery/celery / process_initializer

Function process_initializer

celery/concurrency/prefork.py:41–82  ·  view source on GitHub ↗

Pool child process initializer. Initialize the child pool process to ensure the correct app instance is used and things like logging works.

(app, hostname)

Source from the content-addressed store, hash-verified

39
40
41def process_initializer(app, hostname):
42 """Pool child process initializer.
43
44 Initialize the child pool process to ensure the correct
45 app instance is used and things like logging works.
46 """
47 # Each running worker gets SIGKILL by OS when main process exits.
48 platforms.set_pdeathsig('SIGKILL')
49 _set_task_join_will_block(True)
50 platforms.signals.reset(*WORKER_SIGRESET)
51 platforms.signals.ignore(*WORKER_SIGIGNORE)
52 platforms.set_mp_process_title('celeryd', hostname=hostname)
53 # This is for Windows and other platforms not supporting
54 # fork(). Note that init_worker makes sure it's only
55 # run once per process.
56 app.loader.init_worker()
57 app.loader.init_worker_process()
58 logfile = os.environ.get('CELERY_LOG_FILE') or None
59 if logfile and '%i' in logfile.lower():
60 # logfile path will differ so need to set up logging again.
61 app.log.already_setup = False
62 app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0) or 0),
63 logfile,
64 bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
65 str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')),
66 hostname=hostname)
67 if os.environ.get('FORKED_BY_MULTIPROCESSING'):
68 # pool did execv after fork
69 trace.setup_worker_optimizations(app, hostname)
70 else:
71 app.set_current()
72 set_default_app(app)
73 app.finalize()
74 trace._tasks = app._tasks # enables fast_trace_task optimization.
75 # rebuild execution handler for all tasks.
76 from celery.app.trace import build_tracer
77 for name, task in app.tasks.items():
78 task.__trace__ = build_tracer(name, task, app.loader, hostname,
79 app=app)
80 from celery.worker import state as worker_state
81 worker_state.reset_state()
82 signals.worker_process_init.send(sender=None)
83
84
85def process_destructor(pid, exitcode):

Callers 2

test_pdeath_sigMethod · 0.90

Calls 13

set_default_appFunction · 0.90
build_tracerFunction · 0.90
init_workerMethod · 0.80
init_worker_processMethod · 0.80
set_currentMethod · 0.80
resetMethod · 0.45
ignoreMethod · 0.45
getMethod · 0.45
setupMethod · 0.45
finalizeMethod · 0.45
itemsMethod · 0.45

Tested by 2

test_pdeath_sigMethod · 0.72