Start a consumer task for a worker's request queue.
(self, worker_pid)
| 495 | await DirtyProtocol.write_message_async(client_writer, response) |
| 496 | |
| 497 | async def _start_worker_consumer(self, worker_pid): |
| 498 | """Start a consumer task for a worker's request queue.""" |
| 499 | queue = asyncio.Queue() |
| 500 | self.worker_queues[worker_pid] = queue |
| 501 | |
| 502 | async def consumer(): |
| 503 | while self.alive: |
| 504 | try: |
| 505 | request, client_writer, future = await queue.get() |
| 506 | try: |
| 507 | await self._execute_on_worker( |
| 508 | worker_pid, request, client_writer |
| 509 | ) |
| 510 | if not future.done(): |
| 511 | future.set_result(None) |
| 512 | except Exception as e: |
| 513 | if not future.done(): |
| 514 | future.set_exception(e) |
| 515 | finally: |
| 516 | queue.task_done() |
| 517 | except asyncio.CancelledError: |
| 518 | break |
| 519 | |
| 520 | task = asyncio.create_task(consumer()) |
| 521 | self.worker_consumers[worker_pid] = task |
| 522 | |
| 523 | async def _execute_on_worker(self, worker_pid, request, client_writer): |
| 524 | """ |
no outgoing calls