MCPcopy
hub / github.com/urllib3/urllib3 / consume_socket

Function consume_socket

dummyserver/testcase.py:20–38  ·  view source on GitHub ↗
(
    sock: SSLTransport | socket.socket,
    chunks: int = 65536,
    quit_event: threading.Event | None = None,
)

Source from the content-addressed store, hash-verified

18
19
20def consume_socket(
21 sock: SSLTransport | socket.socket,
22 chunks: int = 65536,
23 quit_event: threading.Event | None = None,
24) -> bytearray:
25 consumed = bytearray()
26 sock.settimeout(LONG_TIMEOUT)
27 while True:
28 if quit_event and quit_event.is_set():
29 break
30 try:
31 b = sock.recv(chunks)
32 except TimeoutError:
33 continue
34 assert isinstance(b, bytes)
35 consumed += b
36 if b.endswith(b"\r\n\r\n"):
37 break
38 return consumed
39
40
41class SocketDummyServerTestCase:

Callers 14

socket_handlerMethod · 0.90
shutdown_handlerMethod · 0.90
socket_handlerMethod · 0.90
socket_handlerMethod · 0.90
consume_ssl_socketMethod · 0.90
http_socket_handlerMethod · 0.90

Calls 2

settimeoutMethod · 0.45
recvMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…