MCPcopy Index your code
hub / github.com/python/mypy / __init__

Method __init__

mypy/ipc.py:189–227  ·  view source on GitHub ↗
(self, name: str, timeout: float | None)

Source from the content-addressed store, hash-verified

187 """The client side of an IPC connection."""
188
189 def __init__(self, name: str, timeout: float | None) -> None:
190 super().__init__(name, timeout)
191 if sys.platform == "win32":
192 timeout = int(self.timeout * 1000) if self.timeout else _winapi.NMPWAIT_WAIT_FOREVER
193 try:
194 _winapi.WaitNamedPipe(self.name, timeout)
195 except FileNotFoundError as e:
196 raise IPCException(f"The NamedPipe at {self.name} was not found.") from e
197 except OSError as e:
198 if e.winerror == _winapi.ERROR_SEM_TIMEOUT:
199 raise IPCException("Timed out waiting for connection.") from e
200 else:
201 raise
202 try:
203 self.connection = _winapi.CreateFile(
204 self.name,
205 _winapi.GENERIC_READ | _winapi.GENERIC_WRITE,
206 0,
207 _winapi.NULL,
208 _winapi.OPEN_EXISTING,
209 _winapi.FILE_FLAG_OVERLAPPED,
210 _winapi.NULL,
211 )
212 except OSError as e:
213 if e.winerror == _winapi.ERROR_PIPE_BUSY:
214 raise IPCException("The connection is busy.") from e
215 else:
216 raise
217 _winapi.SetNamedPipeHandleState(
218 self.connection, _winapi.PIPE_READMODE_MESSAGE, None, None
219 )
220 else:
221 self.connection = socket.socket(socket.AF_UNIX)
222 # This is already default on Linux, we set same buffer size
223 # for macOS vs Linux consistency to simplify reasoning.
224 self.connection.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, MAX_READ)
225 self.connection.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, MAX_READ)
226 self.connection.settimeout(timeout)
227 self.connection.connect(name)
228
229 def __enter__(self) -> IPCClient:
230 return self

Callers

nothing calls this directly

Calls 4

intClass · 0.85
IPCExceptionClass · 0.85
connectMethod · 0.80
__init__Method · 0.45

Tested by

no test coverage detected