A simple websocket connection request and response helper
(url: str)
| 70 | |
| 71 | |
| 72 | async def wsresponse(url: str): |
| 73 | """ |
| 74 | A simple websocket connection request and response helper |
| 75 | """ |
| 76 | url = url.replace("ws:", "http:") |
| 77 | headers = { |
| 78 | "connection": "upgrade", |
| 79 | "upgrade": "websocket", |
| 80 | "Sec-WebSocket-Key": "x3JJHMbDL1EzLkh9GBhXDw==", |
| 81 | "Sec-WebSocket-Version": "13", |
| 82 | } |
| 83 | async with httpx.AsyncClient() as client: |
| 84 | return await client.get(url, headers=headers) |
| 85 | |
| 86 | |
| 87 | async def test_invalid_upgrade(ws_protocol_cls: WSProtocol, http_protocol_cls: HTTPProtocol, unused_tcp_port: int): |