MCPcopy
hub / github.com/urllib3/urllib3 / socket_handler

Method socket_handler

test/test_ssltransport.py:113–126  ·  view source on GitHub ↗
(listener: socket.socket)

Source from the content-addressed store, hash-verified

111 quit_event = threading.Event()
112
113 def socket_handler(listener: socket.socket) -> None:
114 sock = listener.accept()[0]
115 try:
116 with self.server_context.wrap_socket(sock, server_side=True) as ssock:
117 request = consume_socket(
118 ssock,
119 quit_event=quit_event,
120 )
121 if not validate:
122 return
123 validate_request(request)
124 ssock.send(sample_response())
125 except (ConnectionAbortedError, ConnectionResetError, BrokenPipeError):
126 return
127
128 chosen_handler = handler if handler else socket_handler
129 self._start_server(chosen_handler, quit_event=quit_event)

Callers

nothing calls this directly

Calls 5

consume_socketFunction · 0.90
validate_requestFunction · 0.85
sample_responseFunction · 0.85
wrap_socketMethod · 0.80
sendMethod · 0.45

Tested by

no test coverage detected