()
| 500 | self.worker_queues[worker_pid] = queue |
| 501 | |
| 502 | async def consumer(): |
| 503 | while self.alive: |
| 504 | try: |
| 505 | request, client_writer, future = await queue.get() |
| 506 | try: |
| 507 | await self._execute_on_worker( |
| 508 | worker_pid, request, client_writer |
| 509 | ) |
| 510 | if not future.done(): |
| 511 | future.set_result(None) |
| 512 | except Exception as e: |
| 513 | if not future.done(): |
| 514 | future.set_exception(e) |
| 515 | finally: |
| 516 | queue.task_done() |
| 517 | except asyncio.CancelledError: |
| 518 | break |
| 519 | |
| 520 | task = asyncio.create_task(consumer()) |
| 521 | self.worker_consumers[worker_pid] = task |
nothing calls this directly
no test coverage detected