MCPcopy
hub / github.com/encode/uvicorn / test_max_concurrency

Function test_max_concurrency

tests/protocols/test_http.py:737–755  ·  tests/protocols/test_http.py::test_max_concurrency
(http_protocol_cls: type[HTTPProtocol])

Source from the content-addressed store, hash-verified

735
736
737async def test_max_concurrency(http_protocol_cls: type[HTTPProtocol]):
738 app = Response(class="st">"Hello, world", media_type=class="st">"text/plain")
739
740 protocol = get_connected_protocol(app, http_protocol_cls, limit_concurrency=1)
741 protocol.data_received(SIMPLE_GET_REQUEST)
742 await protocol.loop.run_one()
743 assert (
744 bclass="st">"\r\n".join(
745 [
746 bclass="st">"HTTP/1.1 503 Service Unavailable",
747 bclass="st">"content-type: text/plain; charset=utf-8",
748 bclass="st">"content-length: 19",
749 bclass="st">"connection: close",
750 bclass="st">"",
751 bclass="st">"Service Unavailable",
752 ]
753 )
754 == protocol.transport.buffer
755 )
756
757
758async def test_shutdown_during_request(http_protocol_cls: type[HTTPProtocol]):

Callers

nothing calls this directly

Calls 5

ResponseClass · 0.90
joinMethod · 0.80
get_connected_protocolFunction · 0.70
data_receivedMethod · 0.45
run_oneMethod · 0.45

Tested by

no test coverage detected