MCPcopy
hub / github.com/celery/celery / on_stop

Method on_stop

celery/concurrency/prefork.py:142–188  ·  view source on GitHub ↗

Gracefully stop the pool.

(self)

Source from the content-addressed store, hash-verified

140 return reg(loop)
141
142 def on_stop(self):
143 """Gracefully stop the pool."""
144 if self._pool is not None and self._pool._state in (RUN, CLOSE):
145 self._pool.close()
146
147 # Keep firing timers (for heartbeats on async transports) while
148 # the pool drains. If not using an async transport, no hub exists
149 # and the timer thread is not created.
150 hub = get_event_loop()
151 if hub is not None:
152 shutdown_event = threading.Event()
153
154 def fire_timers_loop():
155 while not shutdown_event.is_set():
156 try:
157 hub.fire_timers()
158 except Exception:
159 logger.warning(
160 "Exception in timer thread during prefork on_stop()",
161 exc_info=True,
162 )
163 # 0.5 seconds was chosen as a balance between joining quickly
164 # after the pool join is complete and sleeping long enough to
165 # avoid excessive CPU usage.
166 time.sleep(0.5)
167
168 timer_thread = threading.Thread(
169 target=fire_timers_loop,
170 daemon=True,
171 name="prefork-timer-shutdown",
172 )
173 timer_thread.start()
174
175 try:
176 self._pool.join()
177 finally:
178 shutdown_event.set()
179 timer_thread.join(timeout=1.0)
180
181 if timer_thread.is_alive():
182 logger.warning(
183 "Timer thread in prefork on_stop() did not terminate cleanly"
184 )
185 else:
186 self._pool.join()
187
188 self._pool = None
189
190 def on_terminate(self):
191 """Force terminate the pool."""

Callers

nothing calls this directly

Calls 6

EventMethod · 0.80
is_aliveMethod · 0.80
closeMethod · 0.45
startMethod · 0.45
joinMethod · 0.45
setMethod · 0.45

Tested by

no test coverage detected