()
| 148 | response_sent = threading.Event() |
| 149 | |
| 150 | def server_handler(): |
| 151 | conn, _ = server_sock.accept() |
| 152 | try: |
| 153 | msg = ControlProtocol.read_message(conn) |
| 154 | resp = make_response(msg["id"], response_data) |
| 155 | ControlProtocol.write_message(conn, resp) |
| 156 | response_sent.set() |
| 157 | finally: |
| 158 | conn.close() |
| 159 | |
| 160 | server_thread = threading.Thread(target=server_handler) |
| 161 | server_thread.start() |
nothing calls this directly
no test coverage detected