(cancel_conn: PGcancelConn, *, timeout: float = 0.0)
| 111 | |
| 112 | |
| 113 | def _cancel(cancel_conn: PGcancelConn, *, timeout: float = 0.0) -> PQGenConn[None]: |
| 114 | deadline = monotonic() + timeout if timeout else 0.0 |
| 115 | while True: |
| 116 | if deadline and monotonic() > deadline: |
| 117 | raise e.CancellationTimeout("cancellation timeout expired") |
| 118 | |
| 119 | if (status := cancel_conn.poll()) == POLL_OK: |
| 120 | break |
| 121 | elif status == POLL_READING: |
| 122 | yield cancel_conn.socket, WAIT_R |
| 123 | elif status == POLL_WRITING: |
| 124 | yield cancel_conn.socket, WAIT_W |
| 125 | elif status == POLL_FAILED: |
| 126 | raise e.OperationalError( |
| 127 | f"cancellation failed: {cancel_conn.get_error_message()}" |
| 128 | ) |
| 129 | else: |
| 130 | raise e.InternalError(f"unexpected poll status: {status}") |
| 131 | |
| 132 | |
| 133 | def _execute(pgconn: PGconn) -> PQGen[list[PGresult]]: |
nothing calls this directly
no test coverage detected