(reader, writer)
| 210 | socket_path = os.path.join(tmpdir, "test.sock") |
| 211 | |
| 212 | async def handler(reader, writer): |
| 213 | msg = await ControlProtocol.read_message_async(reader) |
| 214 | received.append(msg) |
| 215 | await ControlProtocol.write_message_async(writer, data) |
| 216 | writer.close() |
| 217 | await writer.wait_closed() |
| 218 | |
| 219 | server = await asyncio.start_unix_server(handler, path=socket_path) |
| 220 |
no test coverage detected