MCPcopy
hub / github.com/encode/starlette / test_streaming_response_on_client_disconnects

Function test_streaming_response_on_client_disconnects

tests/test_responses.py:636–665  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

634
635@pytest.mark.anyio
636async def test_streaming_response_on_client_disconnects() -> None:
637 chunks = bytearray()
638 streamed = False
639
640 async def receive_disconnect() -> Message:
641 raise NotImplementedError
642
643 async def send(message: Message) -> None:
644 nonlocal streamed
645 if message["type"] == "http.response.body":
646 if not streamed:
647 chunks.extend(message.get("body", b""))
648 streamed = True
649 else:
650 raise OSError
651
652 async def stream_indefinitely() -> AsyncGenerator[bytes, None]:
653 while True:
654 await anyio.sleep(0)
655 yield b"chunk"
656
657 stream = stream_indefinitely()
658 response = StreamingResponse(content=stream)
659
660 with anyio.move_on_after(1) as cancel_scope:
661 with pytest.raises(ClientDisconnect):
662 await response({"type": "http", "asgi": {"spec_version": "2.4"}}, receive_disconnect, send)
663 assert not cancel_scope.cancel_called, "Content streaming should stop itself."
664 assert chunks == b"chunk"
665 await stream.aclose()
666
667
668@pytest.mark.anyio

Callers

nothing calls this directly

Calls 2

StreamingResponseClass · 0.90
stream_indefinitelyFunction · 0.85

Tested by

no test coverage detected