Clean up after a worker exits. Saves the dead worker's app list to pending respawns so the replacement worker gets the same apps.
(self, pid)
| 1009 | self._cleanup_worker(pid) |
| 1010 | |
| 1011 | def _cleanup_worker(self, pid): |
| 1012 | """ |
| 1013 | Clean up after a worker exits. |
| 1014 | |
| 1015 | Saves the dead worker's app list to pending respawns so the |
| 1016 | replacement worker gets the same apps. |
| 1017 | """ |
| 1018 | self._close_worker_connection(pid) |
| 1019 | |
| 1020 | # Cancel consumer task |
| 1021 | if pid in self.worker_consumers: |
| 1022 | self.worker_consumers[pid].cancel() |
| 1023 | del self.worker_consumers[pid] |
| 1024 | |
| 1025 | # Remove queue |
| 1026 | self.worker_queues.pop(pid, None) |
| 1027 | |
| 1028 | # Save dead worker's apps for respawn BEFORE unregistering |
| 1029 | if pid in self.worker_app_map: |
| 1030 | dead_apps = list(self.worker_app_map[pid]) |
| 1031 | if dead_apps: |
| 1032 | self._pending_respawns.append(dead_apps) |
| 1033 | |
| 1034 | # Now safe to unregister the worker's apps |
| 1035 | self._unregister_worker(pid) |
| 1036 | |
| 1037 | worker = self.workers.pop(pid, None) |
| 1038 | if worker: |
| 1039 | self.cfg.dirty_worker_exit(self, worker) |
| 1040 | socket_path = self.worker_sockets.pop(pid, None) |
| 1041 | if socket_path and os.path.exists(socket_path): |
| 1042 | try: |
| 1043 | os.unlink(socket_path) |
| 1044 | except OSError: |
| 1045 | pass |
| 1046 | |
| 1047 | async def murder_workers(self): |
| 1048 | """Kill workers that have timed out.""" |