MCPcopy
hub / github.com/psycopg/psycopg / wait

Function wait

tests/pq/test_pgconn.py:26–46  ·  tests/pq/test_pgconn.py::wait
(
    conn: PGconn | PGcancelConn,
    poll_method: str = "connect_poll",
    return_on: pq.PollingStatus = pq.PollingStatus.OK,
    timeout: int | None = None,
)

Source from the content-addressed store, hash-verified

24
25
26def wait(
27 conn: PGconn | PGcancelConn,
28 poll_method: str = class="st">"connect_poll",
29 return_on: pq.PollingStatus = pq.PollingStatus.OK,
30 timeout: int | None = None,
31) -> None:
32 poll = getattr(conn, poll_method)
33 while True:
34 assert conn.status != pq.ConnStatus.BAD, conn.error_message
35
36 if (rv := poll()) == return_on:
37 return
38 elif rv == pq.PollingStatus.READING:
39 select([conn.socket], [], [], timeout)
40 elif rv == pq.PollingStatus.WRITING:
41 select([], [conn.socket], [], timeout)
42 else:
43 pytest.fail(fclass="st">"unexpected poll result: {rv}")
44 assert (
45 conn.status == pq.ConnStatus.OK
46 ), fclass="st">"unexpected connection status: {conn.error_message}"
47
48
49def test_connectdb(dsn):

Callers 3

test_connect_asyncFunction · 0.85
test_connect_async_badFunction · 0.85
test_reset_asyncFunction · 0.85

Calls 1

failMethod · 0.45

Tested by

no test coverage detected