Get or create connection to a worker.
(self, worker_pid)
| 623 | return eligible_pids[idx % len(eligible_pids)] |
| 624 | |
| 625 | async def _get_worker_connection(self, worker_pid): |
| 626 | """Get or create connection to a worker.""" |
| 627 | if worker_pid in self.worker_connections: |
| 628 | return self.worker_connections[worker_pid] |
| 629 | |
| 630 | socket_path = self.worker_sockets.get(worker_pid) |
| 631 | if not socket_path: |
| 632 | raise DirtyError(f"No socket for worker {worker_pid}") |
| 633 | |
| 634 | # Wait for socket to be available |
| 635 | for _ in range(50): # 5 seconds max |
| 636 | if os.path.exists(socket_path): |
| 637 | break |
| 638 | await asyncio.sleep(0.1) |
| 639 | else: |
| 640 | raise DirtyError(f"Worker socket not ready: {socket_path}") |
| 641 | |
| 642 | reader, writer = await asyncio.open_unix_connection(socket_path) |
| 643 | self.worker_connections[worker_pid] = (reader, writer) |
| 644 | return reader, writer |
| 645 | |
| 646 | def _close_worker_connection(self, worker_pid): |
| 647 | """Close connection to a worker.""" |