(send_body: bool)
| 1168 | @pytest.mark.anyio |
| 1169 | @pytest.mark.parametrize("send_body", [True, False]) |
| 1170 | async def test_poll_for_disconnect_repeated(send_body: bool) -> None: |
| 1171 | async def app_poll_disconnect(scope: Scope, receive: Receive, send: Send) -> None: |
| 1172 | for _ in range(2): |
| 1173 | msg = await receive() |
| 1174 | while msg["type"] == "http.request": |
| 1175 | msg = await receive() |
| 1176 | assert msg["type"] == "http.disconnect" |
| 1177 | await Response(b"good!")(scope, receive, send) |
| 1178 | |
| 1179 | class MyMiddleware(BaseHTTPMiddleware): |
| 1180 | async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response: |
| 1181 | return await call_next(request) |
| 1182 | |
| 1183 | app = MyMiddleware(app_poll_disconnect) |
| 1184 | |
| 1185 | scope = { |
| 1186 | "type": "http", |
| 1187 | "version": "3", |
| 1188 | "method": "GET", |
| 1189 | "path": "/", |
| 1190 | } |
| 1191 | |
| 1192 | async def receive() -> AsyncIterator[Message]: |
| 1193 | # the key here is that we only ever send 1 htt.disconnect message |
| 1194 | if send_body: |
| 1195 | yield {"type": "http.request", "body": b"hello", "more_body": True} |
| 1196 | yield {"type": "http.request", "body": b"", "more_body": False} |
| 1197 | yield {"type": "http.disconnect"} |
| 1198 | raise AssertionError("Should not be called, would hang") # pragma: no cover |
| 1199 | |
| 1200 | sent: list[Message] = [] |
| 1201 | |
| 1202 | async def send(message: Message) -> None: |
| 1203 | sent.append(message) |
| 1204 | |
| 1205 | await app(scope, receive().__anext__, send) |
| 1206 | |
| 1207 | assert sent == [ |
| 1208 | { |
| 1209 | "type": "http.response.start", |
| 1210 | "status": 200, |
| 1211 | "headers": [(b"content-length", b"5")], |
| 1212 | }, |
| 1213 | {"type": "http.response.body", "body": b"good!", "more_body": True}, |
| 1214 | {"type": "http.response.body", "body": b"", "more_body": False}, |
| 1215 | ] |
| 1216 | |
| 1217 | |
| 1218 | @pytest.mark.anyio |
nothing calls this directly
no test coverage detected