(test_client_factory: TestClientFactory)
| 195 | |
| 196 | |
| 197 | def test_websocket_iter_json(test_client_factory: TestClientFactory) -> None: |
| 198 | async def app(scope: Scope, receive: Receive, send: Send) -> None: |
| 199 | websocket = WebSocket(scope, receive=receive, send=send) |
| 200 | await websocket.accept() |
| 201 | async for data in websocket.iter_json(): |
| 202 | await websocket.send_json({class="st">"message": data}) |
| 203 | |
| 204 | client = test_client_factory(app) |
| 205 | with client.websocket_connect(class="st">"/") as websocket: |
| 206 | websocket.send_json({class="st">"hello": class="st">"world"}) |
| 207 | data = websocket.receive_json() |
| 208 | assert data == {class="st">"message": {class="st">"hello": class="st">"world"}} |
| 209 | |
| 210 | |
| 211 | def test_websocket_concurrency_pattern(test_client_factory: TestClientFactory) -> None: |
nothing calls this directly
no test coverage detected