(http_protocol_cls: type[HTTPProtocol])
| 735 | |
| 736 | |
| 737 | async def test_max_concurrency(http_protocol_cls: type[HTTPProtocol]): |
| 738 | app = Response(class="st">"Hello, world", media_type=class="st">"text/plain") |
| 739 | |
| 740 | protocol = get_connected_protocol(app, http_protocol_cls, limit_concurrency=1) |
| 741 | protocol.data_received(SIMPLE_GET_REQUEST) |
| 742 | await protocol.loop.run_one() |
| 743 | assert ( |
| 744 | bclass="st">"\r\n".join( |
| 745 | [ |
| 746 | bclass="st">"HTTP/1.1 503 Service Unavailable", |
| 747 | bclass="st">"content-type: text/plain; charset=utf-8", |
| 748 | bclass="st">"content-length: 19", |
| 749 | bclass="st">"connection: close", |
| 750 | bclass="st">"", |
| 751 | bclass="st">"Service Unavailable", |
| 752 | ] |
| 753 | ) |
| 754 | == protocol.transport.buffer |
| 755 | ) |
| 756 | |
| 757 | |
| 758 | async def test_shutdown_during_request(http_protocol_cls: type[HTTPProtocol]): |
nothing calls this directly
no test coverage detected