(
self,
exc_ty: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: TracebackType | None = None,
)
| 308 | return self |
| 309 | |
| 310 | def __exit__( |
| 311 | self, |
| 312 | exc_ty: type[BaseException] | None = None, |
| 313 | exc_val: BaseException | None = None, |
| 314 | exc_tb: TracebackType | None = None, |
| 315 | ) -> None: |
| 316 | if sys.platform == "win32": |
| 317 | try: |
| 318 | # Wait for the client to finish reading the last write before disconnecting |
| 319 | if not FlushFileBuffers(self.connection): |
| 320 | raise IPCException( |
| 321 | "Failed to flush NamedPipe buffer, maybe the client hung up?" |
| 322 | ) |
| 323 | finally: |
| 324 | DisconnectNamedPipe(self.connection) |
| 325 | else: |
| 326 | self.close() |
| 327 | |
| 328 | def cleanup(self) -> None: |
| 329 | if sys.platform == "win32": |
nothing calls this directly
no test coverage detected