(self)
| 420 | await communicator.wait() |
| 421 | |
| 422 | async def test_disconnect_with_body(self): |
| 423 | application = get_asgi_application() |
| 424 | scope = self.async_request_factory._base_scope(path="/") |
| 425 | communicator = ApplicationCommunicator(application, scope) |
| 426 | await communicator.send_input({"type": "http.request", "body": b"some body"}) |
| 427 | await communicator.send_input({"type": "http.disconnect"}) |
| 428 | with self.assertRaises(asyncio.TimeoutError): |
| 429 | await communicator.receive_output() |
| 430 | |
| 431 | async def test_assert_in_listen_for_disconnect(self): |
| 432 | application = get_asgi_application() |
nothing calls this directly
no test coverage detected