Main async loop - start server, manage workers.
(self)
| 341 | self._server.close() |
| 342 | |
| 343 | async def _run_async(self): |
| 344 | """Main async loop - start server, manage workers.""" |
| 345 | self._loop = asyncio.get_running_loop() |
| 346 | |
| 347 | # Remove socket if it exists |
| 348 | if os.path.exists(self.socket_path): |
| 349 | os.unlink(self.socket_path) |
| 350 | |
| 351 | # Start Unix socket server for HTTP workers |
| 352 | self._server = await asyncio.start_unix_server( |
| 353 | self.handle_client, |
| 354 | path=self.socket_path |
| 355 | ) |
| 356 | |
| 357 | # Make socket accessible |
| 358 | os.chmod(self.socket_path, 0o600) |
| 359 | |
| 360 | self.log.info("Dirty arbiter listening on %s", self.socket_path) |
| 361 | |
| 362 | # Spawn initial workers |
| 363 | await self.manage_workers() |
| 364 | |
| 365 | # Start periodic tasks |
| 366 | monitor_task = asyncio.create_task(self._worker_monitor()) |
| 367 | |
| 368 | try: |
| 369 | async with self._server: |
| 370 | await self._server.serve_forever() |
| 371 | except (asyncio.CancelledError, RuntimeError): |
| 372 | # RuntimeError raised when server.close() is called during serve_forever() |
| 373 | pass |
| 374 | finally: |
| 375 | monitor_task.cancel() |
| 376 | try: |
| 377 | await monitor_task |
| 378 | except asyncio.CancelledError: |
| 379 | pass |
| 380 | |
| 381 | await self.stop() |
| 382 | |
| 383 | async def _worker_monitor(self): |
| 384 | """Periodically check worker health and manage pool.""" |
no test coverage detected