(http_protocol_cls: type[HTTPProtocol])
| 640 | |
| 641 | |
| 642 | async def test_early_disconnect(http_protocol_cls: type[HTTPProtocol]): |
| 643 | got_disconnect_event = False |
| 644 | |
| 645 | async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable): |
| 646 | nonlocal got_disconnect_event |
| 647 | |
| 648 | while True: |
| 649 | message = await receive() |
| 650 | if message["type"] == "http.disconnect": |
| 651 | break |
| 652 | |
| 653 | got_disconnect_event = True |
| 654 | |
| 655 | protocol = get_connected_protocol(app, http_protocol_cls) |
| 656 | protocol.data_received(SIMPLE_POST_REQUEST) |
| 657 | protocol.eof_received() |
| 658 | protocol.connection_lost(None) |
| 659 | await protocol.loop.run_one() |
| 660 | assert got_disconnect_event |
| 661 | |
| 662 | |
| 663 | async def test_early_response(http_protocol_cls: type[HTTPProtocol]): |
nothing calls this directly
no test coverage detected