MCPcopy
hub / github.com/psycopg/psycopg / test_check_connection

Function test_check_connection

tests/pool/test_pool_common_async.py:608–621  ·  view source on GitHub ↗
(pool_cls, aconn_cls, dsn, autocommit)

Source from the content-addressed store, hash-verified

606@pytest.mark.crdb_skip("pg_terminate_backend")
607@pytest.mark.parametrize("autocommit", [True, False])
608async def test_check_connection(pool_cls, aconn_cls, dsn, autocommit):
609 conn = await aconn_cls.connect(dsn)
610 await set_autocommit(conn, autocommit)
611 await pool_cls.check_connection(conn)
612 assert not conn.closed
613 assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
614
615 async with await aconn_cls.connect(dsn) as conn2:
616 await conn2.execute("select pg_terminate_backend(%s)", [conn.info.backend_pid])
617
618 with pytest.raises(psycopg.OperationalError):
619 await pool_cls.check_connection(conn)
620
621 assert conn.closed
622
623
624async def test_check_init(pool_cls, dsn):

Callers

nothing calls this directly

Calls 4

set_autocommitFunction · 0.85
connectMethod · 0.45
check_connectionMethod · 0.45
executeMethod · 0.45

Tested by

no test coverage detected