Wait until some connections are readable. Return index of each readable connection in the original list.
(conns: Sequence[IPCBase], timeout: float | None = None)
| 374 | |
| 375 | |
| 376 | def ready_to_read(conns: Sequence[IPCBase], timeout: float | None = None) -> list[int]: |
| 377 | """Wait until some connections are readable. |
| 378 | |
| 379 | Return index of each readable connection in the original list. |
| 380 | """ |
| 381 | unread_messages = [i for i, conn in enumerate(conns) if conn.buffer] |
| 382 | if unread_messages: |
| 383 | # If we already have unread messages in the buffer, return those first. |
| 384 | return unread_messages |
| 385 | if sys.platform == "win32": |
| 386 | # Windows doesn't support select() on named pipes. Instead, start an overlapped |
| 387 | # ReadFile on each pipe (which internally creates an event via CreateEventW), |
| 388 | # then WaitForMultipleObjects on those events for efficient OS-level waiting. |
| 389 | # Any data consumed by the probe reads is stored into each connection's buffer |
| 390 | # so the subsequent read_bytes() call will find it via frame_from_buffer(). |
| 391 | WAIT_FAILED = 0xFFFFFFFF |
| 392 | pending: list[tuple[int, _winapi.Overlapped]] = [] |
| 393 | events: list[int] = [] |
| 394 | ready: list[int] = [] |
| 395 | |
| 396 | for i, conn in enumerate(conns): |
| 397 | try: |
| 398 | ov, err = _winapi.ReadFile(conn.connection, 1, overlapped=True) |
| 399 | except OSError: |
| 400 | # Broken/closed pipe. Mimic Linux behavior here, caller will get |
| 401 | # the exception when trying to read from this socket. |
| 402 | ready.append(i) |
| 403 | continue |
| 404 | if err == _winapi.ERROR_IO_PENDING: |
| 405 | events.append(ov.event) |
| 406 | pending.append((i, ov)) |
| 407 | else: |
| 408 | # Data was immediately available (err == 0 or ERROR_MORE_DATA) |
| 409 | _, err = ov.GetOverlappedResult(True) |
| 410 | data = ov.getbuffer() |
| 411 | if data: |
| 412 | conn.buffer.extend(data) |
| 413 | ready.append(i) |
| 414 | |
| 415 | # Wait only if nothing is immediately ready and there are pending operations |
| 416 | if not ready and events: |
| 417 | timeout_ms = int(timeout * 1000) if timeout is not None else _winapi.INFINITE |
| 418 | res = _winapi.WaitForMultipleObjects(events, False, timeout_ms) |
| 419 | if res == WAIT_FAILED: |
| 420 | for _, ov in pending: |
| 421 | ov.cancel() |
| 422 | raise IPCException(f"Failed to wait for connections: {_winapi.GetLastError()}") |
| 423 | |
| 424 | # Cancel all pending operations. CancelIoEx is asynchronous, so an |
| 425 | # operation may have completed before the cancel took effect. We then |
| 426 | # wait for all operations to finalize and check each result: completed |
| 427 | # reads get their data saved and are marked ready; cancelled ones are |
| 428 | # simply skipped. This avoids a race between checking if an operation |
| 429 | # is signaled and cancelling it. |
| 430 | for _, ov in pending: |
| 431 | ov.cancel() |
| 432 | for i, ov in pending: |
| 433 | try: |