MCPcopy
hub / github.com/encode/uvicorn / test_fragmentation

Function test_fragmentation

tests/protocols/test_http.py:932–974  ·  tests/protocols/test_http.py::test_fragmentation
(unused_tcp_port: int)

Source from the content-addressed store, hash-verified

930
931@skip_if_no_httptools
932def test_fragmentation(unused_tcp_port: int):
933 def receive_all(sock: socket.socket):
934 chunks: list[bytes] = []
935 while True:
936 chunk = sock.recv(1024)
937 if not chunk:
938 break
939 chunks.append(chunk)
940 return bclass="st">"".join(chunks)
941
942 app = Response(class="st">"Hello, world", media_type=class="st">"text/plain")
943
944 def send_fragmented_req(path: str):
945 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
946 sock.connect((class="st">"127.0.0.1", unused_tcp_port))
947 d = (fclass="st">"GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n").encode()
948 split = len(path) // 2
949 sock.sendall(d[:split])
950 time.sleep(0.01)
951 sock.sendall(d[split:])
952 resp = receive_all(sock)
953 class="cm"># see https://github.com/kmonsoor/py-amqplib/issues/45
954 class="cm"># we skip the error on bsd systems if python is too slow
955 try:
956 sock.shutdown(socket.SHUT_RDWR)
957 except Exception: class="cm"># pragma: no cover
958 pass
959 sock.close()
960 return resp
961
962 config = Config(app=app, http=class="st">"httptools", port=unused_tcp_port)
963 server = Server(config=config)
964 t = threading.Thread(target=server.run)
965 t.daemon = True
966 t.start()
967 time.sleep(1) class="cm"># wait for uvicorn to start
968
969 path = class="st">"/?param=" + class="st">"q" * 10
970 response = send_fragmented_req(path)
971 bad_response = bclass="st">"HTTP/1.1 400 Bad Request"
972 assert bad_response != response[: len(bad_response)]
973 server.should_exit = True
974 t.join()
975
976
977async def test_huge_headers_h11protocol_failure():

Callers

nothing calls this directly

Calls 6

ResponseClass · 0.90
ConfigClass · 0.90
ServerClass · 0.90
send_fragmented_reqFunction · 0.85
startMethod · 0.80
joinMethod · 0.80

Tested by

no test coverage detected