MCPcopy
hub / github.com/encode/uvicorn / send_fragmented_req

Function send_fragmented_req

tests/protocols/test_http.py:944–960  ·  view source on GitHub ↗
(path: str)

Source from the content-addressed store, hash-verified

942 app = Response("Hello, world", media_type="text/plain")
943
944 def send_fragmented_req(path: str):
945 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
946 sock.connect(("127.0.0.1", unused_tcp_port))
947 d = (f"GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n").encode()
948 split = len(path) // 2
949 sock.sendall(d[:split])
950 time.sleep(0.01)
951 sock.sendall(d[split:])
952 resp = receive_all(sock)
953 # see https://github.com/kmonsoor/py-amqplib/issues/45
954 # we skip the error on bsd systems if python is too slow
955 try:
956 sock.shutdown(socket.SHUT_RDWR)
957 except Exception: # pragma: no cover
958 pass
959 sock.close()
960 return resp
961
962 config = Config(app=app, http="httptools", port=unused_tcp_port)
963 server = Server(config=config)

Callers 1

test_fragmentationFunction · 0.85

Calls 3

receive_allFunction · 0.85
shutdownMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected