MCPcopy
hub / github.com/encode/starlette / test_websocket_concurrency_pattern

Function test_websocket_concurrency_pattern

tests/test_websockets.py:211–238  ·  tests/test_websockets.py::test_websocket_concurrency_pattern
(test_client_factory: TestClientFactory)

Source from the content-addressed store, hash-verified

209
210
211def test_websocket_concurrency_pattern(test_client_factory: TestClientFactory) -> None:
212 stream_send: ObjectSendStream[MutableMapping[str, Any]]
213 stream_receive: ObjectReceiveStream[MutableMapping[str, Any]]
214 stream_send, stream_receive = anyio.create_memory_object_stream()
215
216 async def reader(websocket: WebSocket) -> None:
217 async with stream_send:
218 async for data in websocket.iter_json():
219 await stream_send.send(data)
220
221 async def writer(websocket: WebSocket) -> None:
222 async with stream_receive:
223 async for message in stream_receive:
224 await websocket.send_json(message)
225
226 async def app(scope: Scope, receive: Receive, send: Send) -> None:
227 websocket = WebSocket(scope, receive=receive, send=send)
228 await websocket.accept()
229 async with anyio.create_task_group() as task_group:
230 task_group.start_soon(reader, websocket)
231 await writer(websocket)
232 await websocket.close()
233
234 client = test_client_factory(app)
235 with client.websocket_connect(class="st">"/") as websocket:
236 websocket.send_json({class="st">"hello": class="st">"world"})
237 data = websocket.receive_json()
238 assert data == {class="st">"hello": class="st">"world"}
239
240
241def test_client_close(test_client_factory: TestClientFactory) -> None:

Callers

nothing calls this directly

Calls 4

send_jsonMethod · 0.95
receive_jsonMethod · 0.95
test_client_factoryFunction · 0.85
websocket_connectMethod · 0.80

Tested by

no test coverage detected