(
sock: SSLTransport | socket.socket,
chunks: int = 65536,
quit_event: threading.Event | None = None,
)
| 18 | |
| 19 | |
| 20 | def consume_socket( |
| 21 | sock: SSLTransport | socket.socket, |
| 22 | chunks: int = 65536, |
| 23 | quit_event: threading.Event | None = None, |
| 24 | ) -> bytearray: |
| 25 | consumed = bytearray() |
| 26 | sock.settimeout(LONG_TIMEOUT) |
| 27 | while True: |
| 28 | if quit_event and quit_event.is_set(): |
| 29 | break |
| 30 | try: |
| 31 | b = sock.recv(chunks) |
| 32 | except TimeoutError: |
| 33 | continue |
| 34 | assert isinstance(b, bytes) |
| 35 | consumed += b |
| 36 | if b.endswith(b"\r\n\r\n"): |
| 37 | break |
| 38 | return consumed |
| 39 | |
| 40 | |
| 41 | class SocketDummyServerTestCase: |
no test coverage detected