(
test_client_factory: TestClientFactory,
)
| 76 | |
| 77 | |
| 78 | def test_websocket_endpoint_on_receive_bytes( |
| 79 | test_client_factory: TestClientFactory, |
| 80 | ) -> None: |
| 81 | class WebSocketApp(WebSocketEndpoint): |
| 82 | encoding = "bytes" |
| 83 | |
| 84 | async def on_receive(self, websocket: WebSocket, data: bytes) -> None: |
| 85 | await websocket.send_bytes(b"Message bytes was: " + data) |
| 86 | |
| 87 | client = test_client_factory(WebSocketApp) |
| 88 | with client.websocket_connect("/ws") as websocket: |
| 89 | websocket.send_bytes(b"Hello, world!") |
| 90 | _bytes = websocket.receive_bytes() |
| 91 | assert _bytes == b"Message bytes was: Hello, world!" |
| 92 | |
| 93 | with pytest.raises(RuntimeError): |
| 94 | with client.websocket_connect("/ws") as websocket: |
| 95 | websocket.send_text("Hello world") |
| 96 | |
| 97 | |
| 98 | def test_websocket_endpoint_on_receive_json( |
nothing calls this directly
no test coverage detected