MCPcopy Index your code
hub / github.com/python/mypy / __enter__

Method __enter__

mypy/ipc.py:278–308  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

276 self.sock.settimeout(timeout)
277
278 def __enter__(self) -> IPCServer:
279 if sys.platform == "win32":
280 # NOTE: It is theoretically possible that this will hang forever if the
281 # client never connects, though this can be "solved" by killing the server
282 try:
283 ov = _winapi.ConnectNamedPipe(self.connection, overlapped=True)
284 except OSError as e:
285 # Don't raise if the client already exists, or the client already connected
286 if e.winerror not in (_winapi.ERROR_PIPE_CONNECTED, _winapi.ERROR_NO_DATA):
287 raise
288 else:
289 try:
290 timeout = int(self.timeout * 1000) if self.timeout else _winapi.INFINITE
291 res = _winapi.WaitForSingleObject(ov.event, timeout)
292 assert res == _winapi.WAIT_OBJECT_0
293 except BaseException:
294 ov.cancel()
295 _winapi.CloseHandle(self.connection)
296 raise
297 _, err = ov.GetOverlappedResult(True)
298 assert err == 0
299 else:
300 try:
301 self.connection, _ = self.sock.accept()
302 # This is already default on Linux, we set same buffer size
303 # for macOS vs Linux consistency to simplify reasoning.
304 self.connection.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, MAX_READ)
305 self.connection.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, MAX_READ)
306 except TimeoutError as e:
307 raise IPCException("The socket timed out") from e
308 return self
309
310 def __exit__(
311 self,

Callers

nothing calls this directly

Calls 3

intClass · 0.85
IPCExceptionClass · 0.85
acceptMethod · 0.45

Tested by

no test coverage detected