(
test_client_factory: TestClientFactory,
)
| 116 | |
| 117 | |
| 118 | def test_websocket_endpoint_on_receive_json_binary( |
| 119 | test_client_factory: TestClientFactory, |
| 120 | ) -> None: |
| 121 | class WebSocketApp(WebSocketEndpoint): |
| 122 | encoding = "json" |
| 123 | |
| 124 | async def on_receive(self, websocket: WebSocket, data: str) -> None: |
| 125 | await websocket.send_json({"message": data}, mode="binary") |
| 126 | |
| 127 | client = test_client_factory(WebSocketApp) |
| 128 | with client.websocket_connect("/ws") as websocket: |
| 129 | websocket.send_json({"hello": "world"}, mode="binary") |
| 130 | data = websocket.receive_json(mode="binary") |
| 131 | assert data == {"message": {"hello": "world"}} |
| 132 | |
| 133 | |
| 134 | def test_websocket_endpoint_on_receive_text( |
nothing calls this directly
no test coverage detected