MCPcopy
hub / github.com/encode/starlette / test_request_stream_then_body

Function test_request_stream_then_body

tests/test_requests.py:164–180  ·  tests/test_requests.py::test_request_stream_then_body
(test_client_factory: TestClientFactory)

Source from the content-addressed store, hash-verified

162
163
164def test_request_stream_then_body(test_client_factory: TestClientFactory) -> None:
165 async def app(scope: Scope, receive: Receive, send: Send) -> None:
166 request = Request(scope, receive)
167 chunks = bclass="st">""
168 async for chunk in request.stream(): class="cm"># pragma: no branch
169 chunks += chunk
170 try:
171 body = await request.body()
172 except RuntimeError:
173 body = bclass="st">"<stream consumed>"
174 response = JSONResponse({class="st">"body": body.decode(), class="st">"stream": chunks.decode()})
175 await response(scope, receive, send)
176
177 client = test_client_factory(app)
178
179 response = client.post(class="st">"/", data=class="st">"abc") class="cm"># type: ignore
180 assert response.json() == {class="st">"body": class="st">"<stream consumed>", class="st">"stream": class="st">"abc"}
181
182
183def test_request_json(test_client_factory: TestClientFactory) -> None:

Callers

nothing calls this directly

Calls 3

test_client_factoryFunction · 0.85
jsonMethod · 0.80
postMethod · 0.45

Tested by

no test coverage detected