Gracefully stop the pool.
(self)
| 140 | return reg(loop) |
| 141 | |
| 142 | def on_stop(self): |
| 143 | """Gracefully stop the pool.""" |
| 144 | if self._pool is not None and self._pool._state in (RUN, CLOSE): |
| 145 | self._pool.close() |
| 146 | |
| 147 | # Keep firing timers (for heartbeats on async transports) while |
| 148 | # the pool drains. If not using an async transport, no hub exists |
| 149 | # and the timer thread is not created. |
| 150 | hub = get_event_loop() |
| 151 | if hub is not None: |
| 152 | shutdown_event = threading.Event() |
| 153 | |
| 154 | def fire_timers_loop(): |
| 155 | while not shutdown_event.is_set(): |
| 156 | try: |
| 157 | hub.fire_timers() |
| 158 | except Exception: |
| 159 | logger.warning( |
| 160 | "Exception in timer thread during prefork on_stop()", |
| 161 | exc_info=True, |
| 162 | ) |
| 163 | # 0.5 seconds was chosen as a balance between joining quickly |
| 164 | # after the pool join is complete and sleeping long enough to |
| 165 | # avoid excessive CPU usage. |
| 166 | time.sleep(0.5) |
| 167 | |
| 168 | timer_thread = threading.Thread( |
| 169 | target=fire_timers_loop, |
| 170 | daemon=True, |
| 171 | name="prefork-timer-shutdown", |
| 172 | ) |
| 173 | timer_thread.start() |
| 174 | |
| 175 | try: |
| 176 | self._pool.join() |
| 177 | finally: |
| 178 | shutdown_event.set() |
| 179 | timer_thread.join(timeout=1.0) |
| 180 | |
| 181 | if timer_thread.is_alive(): |
| 182 | logger.warning( |
| 183 | "Timer thread in prefork on_stop() did not terminate cleanly" |
| 184 | ) |
| 185 | else: |
| 186 | self._pool.join() |
| 187 | |
| 188 | self._pool = None |
| 189 | |
| 190 | def on_terminate(self): |
| 191 | """Force terminate the pool.""" |