MCPcopy Index your code
hub / github.com/python/mypy / write_bytes

Method write_bytes

mypy/ipc.py:150–176  ·  view source on GitHub ↗

Write to an IPC connection.

(self, data: bytes)

Source from the content-addressed store, hash-verified

148 self.write_bytes(data.encode("utf-8"))
149
150 def write_bytes(self, data: bytes) -> None:
151 """Write to an IPC connection."""
152
153 # Frame the data by adding fixed size header.
154 encoded_data = struct.pack("!L", len(data)) + data
155
156 if sys.platform == "win32":
157 try:
158 ov, err = _winapi.WriteFile(self.connection, encoded_data, overlapped=True)
159 try:
160 if err == _winapi.ERROR_IO_PENDING:
161 timeout = int(self.timeout * 1000) if self.timeout else _winapi.INFINITE
162 res = _winapi.WaitForSingleObject(ov.event, timeout)
163 if res != _winapi.WAIT_OBJECT_0:
164 raise IPCException(f"Bad result from I/O wait: {res}")
165 elif err != 0:
166 raise IPCException(f"Failed writing to pipe with error: {err}")
167 except BaseException:
168 ov.cancel()
169 raise
170 bytes_written, err = ov.GetOverlappedResult(True)
171 assert err == 0, err
172 assert bytes_written == len(encoded_data)
173 except OSError as e:
174 raise IPCException(f"Failed to write with error: {e.winerror}") from e
175 else:
176 self.connection.sendall(encoded_data)
177
178 def close(self) -> None:
179 if sys.platform == "win32":

Callers 3

writeMethod · 0.95
connectFunction · 0.80
sendFunction · 0.80

Calls 3

lenFunction · 0.85
intClass · 0.85
IPCExceptionClass · 0.85

Tested by

no test coverage detected