MCPcopy
hub / github.com/encode/starlette / test_websocket_blocking_receive

Function test_websocket_blocking_receive

tests/test_testclient.py:233–255  ·  tests/test_testclient.py::test_websocket_blocking_receive
(test_client_factory: TestClientFactory)

Source from the content-addressed store, hash-verified

231
232
233def test_websocket_blocking_receive(test_client_factory: TestClientFactory) -> None:
234 def app(scope: Scope) -> ASGIInstance:
235 async def respond(websocket: WebSocket) -> None:
236 await websocket.send_json({class="st">"message": class="st">"test"})
237
238 async def asgi(receive: Receive, send: Send) -> None:
239 websocket = WebSocket(scope, receive=receive, send=send)
240 await websocket.accept()
241 async with anyio.create_task_group() as task_group:
242 task_group.start_soon(respond, websocket)
243 try:
244 class="cm"># this will block as the client does not send us data
245 class="cm"># it should not prevent `respond` from executing though
246 await websocket.receive_json()
247 except WebSocketDisconnect:
248 pass
249
250 return asgi
251
252 client = test_client_factory(app) class="cm"># type: ignore
253 with client.websocket_connect(class="st">"/") as websocket:
254 data = websocket.receive_json()
255 assert data == {class="st">"message": class="st">"test"}
256
257
258def test_websocket_not_block_on_close(test_client_factory: TestClientFactory) -> None:

Callers

nothing calls this directly

Calls 3

receive_jsonMethod · 0.95
test_client_factoryFunction · 0.85
websocket_connectMethod · 0.80

Tested by

no test coverage detected