Stop all workers.
(self, graceful=True)
| 1103 | self.kill_worker(pid, signal.SIGTERM) |
| 1104 | |
| 1105 | async def stop(self, graceful=True): |
| 1106 | """Stop all workers.""" |
| 1107 | # Cancel all consumer tasks |
| 1108 | for task in self.worker_consumers.values(): |
| 1109 | task.cancel() |
| 1110 | |
| 1111 | sig = signal.SIGTERM if graceful else signal.SIGQUIT |
| 1112 | limit = time.time() + self.cfg.dirty_graceful_timeout |
| 1113 | |
| 1114 | # Signal all workers |
| 1115 | for pid in list(self.workers.keys()): |
| 1116 | self.kill_worker(pid, sig) |
| 1117 | |
| 1118 | # Wait for workers to exit |
| 1119 | while self.workers and time.time() < limit: |
| 1120 | self.reap_workers() |
| 1121 | await asyncio.sleep(0.1) |
| 1122 | |
| 1123 | # Force kill remaining workers |
| 1124 | for pid in list(self.workers.keys()): |
| 1125 | self.kill_worker(pid, signal.SIGKILL) |
| 1126 | self.reap_workers() |
| 1127 | |
| 1128 | def _cleanup_sync(self): |
| 1129 | """Synchronous cleanup on exit.""" |