()
| 291 | response_sent = threading.Event() |
| 292 | |
| 293 | def server_handler(): |
| 294 | conn, _ = server_sock.accept() |
| 295 | try: |
| 296 | # Read request |
| 297 | msg = DirtyProtocol.read_message(conn) |
| 298 | # Send response |
| 299 | resp = make_response(msg["id"], {"result": "success"}) |
| 300 | DirtyProtocol.write_message(conn, resp) |
| 301 | response_sent.set() |
| 302 | finally: |
| 303 | conn.close() |
| 304 | |
| 305 | server_thread = threading.Thread(target=server_handler) |
| 306 | server_thread.start() |
nothing calls this directly
no test coverage detected