()
| 634 | |
| 635 | @pytest.mark.anyio |
| 636 | async def test_streaming_response_on_client_disconnects() -> None: |
| 637 | chunks = bytearray() |
| 638 | streamed = False |
| 639 | |
| 640 | async def receive_disconnect() -> Message: |
| 641 | raise NotImplementedError |
| 642 | |
| 643 | async def send(message: Message) -> None: |
| 644 | nonlocal streamed |
| 645 | if message["type"] == "http.response.body": |
| 646 | if not streamed: |
| 647 | chunks.extend(message.get("body", b"")) |
| 648 | streamed = True |
| 649 | else: |
| 650 | raise OSError |
| 651 | |
| 652 | async def stream_indefinitely() -> AsyncGenerator[bytes, None]: |
| 653 | while True: |
| 654 | await anyio.sleep(0) |
| 655 | yield b"chunk" |
| 656 | |
| 657 | stream = stream_indefinitely() |
| 658 | response = StreamingResponse(content=stream) |
| 659 | |
| 660 | with anyio.move_on_after(1) as cancel_scope: |
| 661 | with pytest.raises(ClientDisconnect): |
| 662 | await response({"type": "http", "asgi": {"spec_version": "2.4"}}, receive_disconnect, send) |
| 663 | assert not cancel_scope.cancel_called, "Content streaming should stop itself." |
| 664 | assert chunks == b"chunk" |
| 665 | await stream.aclose() |
| 666 | |
| 667 | |
| 668 | @pytest.mark.anyio |
nothing calls this directly
no test coverage detected