(test_client_factory: TestClientFactory)
| 231 | |
| 232 | |
| 233 | def test_websocket_blocking_receive(test_client_factory: TestClientFactory) -> None: |
| 234 | def app(scope: Scope) -> ASGIInstance: |
| 235 | async def respond(websocket: WebSocket) -> None: |
| 236 | await websocket.send_json({class="st">"message": class="st">"test"}) |
| 237 | |
| 238 | async def asgi(receive: Receive, send: Send) -> None: |
| 239 | websocket = WebSocket(scope, receive=receive, send=send) |
| 240 | await websocket.accept() |
| 241 | async with anyio.create_task_group() as task_group: |
| 242 | task_group.start_soon(respond, websocket) |
| 243 | try: |
| 244 | class="cm"># this will block as the client does not send us data |
| 245 | class="cm"># it should not prevent `respond` from executing though |
| 246 | await websocket.receive_json() |
| 247 | except WebSocketDisconnect: |
| 248 | pass |
| 249 | |
| 250 | return asgi |
| 251 | |
| 252 | client = test_client_factory(app) class="cm"># type: ignore |
| 253 | with client.websocket_connect(class="st">"/") as websocket: |
| 254 | data = websocket.receive_json() |
| 255 | assert data == {class="st">"message": class="st">"test"} |
| 256 | |
| 257 | |
| 258 | def test_websocket_not_block_on_close(test_client_factory: TestClientFactory) -> None: |
nothing calls this directly
no test coverage detected