(test_client_factory: TestClientFactory)
| 181 | |
| 182 | |
| 183 | def test_websocket_iter_bytes(test_client_factory: TestClientFactory) -> None: |
| 184 | async def app(scope: Scope, receive: Receive, send: Send) -> None: |
| 185 | websocket = WebSocket(scope, receive=receive, send=send) |
| 186 | await websocket.accept() |
| 187 | async for data in websocket.iter_bytes(): |
| 188 | await websocket.send_bytes(bclass="st">"Message was: " + data) |
| 189 | |
| 190 | client = test_client_factory(app) |
| 191 | with client.websocket_connect(class="st">"/") as websocket: |
| 192 | websocket.send_bytes(bclass="st">"Hello, world!") |
| 193 | data = websocket.receive_bytes() |
| 194 | assert data == bclass="st">"Message was: Hello, world!" |
| 195 | |
| 196 | |
| 197 | def test_websocket_iter_json(test_client_factory: TestClientFactory) -> None: |
nothing calls this directly
no test coverage detected