After the response is sent, the receive stream is drained, so a subsequent `is_disconnected()` call reports the request as disconnected.
(test_client_factory: TestClientFactory)
| 267 | |
| 268 | |
| 269 | def test_request_is_disconnected(test_client_factory: TestClientFactory) -> None: |
| 270 | """ |
| 271 | After the response is sent, the receive stream is drained, so a subsequent |
| 272 | `is_disconnected()` call reports the request as disconnected. |
| 273 | """ |
| 274 | disconnected_after_response = None |
| 275 | |
| 276 | async def app(scope: Scope, receive: Receive, send: Send) -> None: |
| 277 | nonlocal disconnected_after_response |
| 278 | |
| 279 | request = Request(scope, receive) |
| 280 | body = await request.body() |
| 281 | disconnected = await request.is_disconnected() |
| 282 | response = JSONResponse({"body": body.decode(), "disconnected": disconnected}) |
| 283 | await response(scope, receive, send) |
| 284 | disconnected_after_response = await request.is_disconnected() |
| 285 | |
| 286 | client = test_client_factory(app) |
| 287 | response = client.post("/", content="foo") |
| 288 | assert response.json() == {"body": "foo", "disconnected": False} |
| 289 | assert disconnected_after_response |
| 290 | |
| 291 | |
| 292 | def test_request_state_object() -> None: |
nothing calls this directly
no test coverage detected