()
| 637 | |
| 638 | # Start the server in background |
| 639 | async def run_briefly(): |
| 640 | # Remove existing socket |
| 641 | if os.path.exists(socket_path): |
| 642 | os.unlink(socket_path) |
| 643 | |
| 644 | worker._server = await asyncio.start_unix_server( |
| 645 | worker.handle_connection, |
| 646 | path=socket_path |
| 647 | ) |
| 648 | os.chmod(socket_path, 0o600) |
| 649 | |
| 650 | # Verify socket exists |
| 651 | assert os.path.exists(socket_path) |
| 652 | |
| 653 | # Close immediately |
| 654 | worker._server.close() |
| 655 | await worker._server.wait_closed() |
| 656 | |
| 657 | await run_briefly() |
| 658 |
nothing calls this directly
no test coverage detected