(listener: socket.socket)
| 92 | quit_event = threading.Event() |
| 93 | |
| 94 | def socket_handler(listener: socket.socket) -> None: |
| 95 | for _ in range(num): |
| 96 | ready_event.set() |
| 97 | |
| 98 | listener.settimeout(LONG_TIMEOUT) |
| 99 | while True: |
| 100 | if quit_event.is_set(): |
| 101 | return |
| 102 | try: |
| 103 | sock = listener.accept()[0] |
| 104 | break |
| 105 | except TimeoutError: |
| 106 | continue |
| 107 | consume_socket(sock, quit_event=quit_event) |
| 108 | if quit_event.is_set(): |
| 109 | sock.close() |
| 110 | return |
| 111 | if block_send: |
| 112 | while not block_send.wait(LONG_TIMEOUT): |
| 113 | if quit_event.is_set(): |
| 114 | sock.close() |
| 115 | return |
| 116 | block_send.clear() |
| 117 | sock.send(response) |
| 118 | sock.close() |
| 119 | |
| 120 | cls._start_server(socket_handler, quit_event=quit_event) |
| 121 | return ready_event |
no test coverage detected