Main async loop - start server and handle connections.
(self)
| 247 | self._cleanup() |
| 248 | |
| 249 | async def _run_async(self): |
| 250 | """Main async loop - start server and handle connections.""" |
| 251 | # Remove socket if it exists |
| 252 | if os.path.exists(self.socket_path): |
| 253 | os.unlink(self.socket_path) |
| 254 | |
| 255 | # Start Unix socket server |
| 256 | self._server = await asyncio.start_unix_server( |
| 257 | self.handle_connection, |
| 258 | path=self.socket_path |
| 259 | ) |
| 260 | |
| 261 | # Make socket accessible |
| 262 | os.chmod(self.socket_path, 0o600) |
| 263 | |
| 264 | self.log.info("Dirty worker %s listening on %s", |
| 265 | self.pid, self.socket_path) |
| 266 | |
| 267 | # Start heartbeat task |
| 268 | heartbeat_task = asyncio.create_task(self._heartbeat_loop()) |
| 269 | |
| 270 | try: |
| 271 | async with self._server: |
| 272 | await self._server.serve_forever() |
| 273 | except asyncio.CancelledError: |
| 274 | pass |
| 275 | finally: |
| 276 | heartbeat_task.cancel() |
| 277 | try: |
| 278 | await heartbeat_task |
| 279 | except asyncio.CancelledError: |
| 280 | pass |
| 281 | |
| 282 | async def _heartbeat_loop(self): |
| 283 | """Periodically update heartbeat.""" |
no test coverage detected