MCPcopy
hub / github.com/psycopg/psycopg / test_tpc_on_pg_connection

Function test_tpc_on_pg_connection

tests/crdb/test_no_crdb.py:14–34  ·  view source on GitHub ↗
(conn, tpc)

Source from the content-addressed store, hash-verified

12
13
14def test_tpc_on_pg_connection(conn, tpc):
15 xid = conn.xid(1, "gtrid", "bqual")
16 assert conn.info.transaction_status == TransactionStatus.IDLE
17
18 conn.tpc_begin(xid)
19 assert conn.info.transaction_status == TransactionStatus.INTRANS
20
21 cur = conn.cursor()
22 cur.execute("insert into test_tpc values ('test_tpc_commit')")
23 assert tpc.count_xacts() == 0
24 assert tpc.count_test_records() == 0
25
26 conn.tpc_prepare()
27 assert conn.info.transaction_status == TransactionStatus.IDLE
28 assert tpc.count_xacts() == 1
29 assert tpc.count_test_records() == 0
30
31 conn.tpc_commit()
32 assert conn.info.transaction_status == TransactionStatus.IDLE
33 assert tpc.count_xacts() == 0
34 assert tpc.count_test_records() == 1

Callers

nothing calls this directly

Calls 8

xidMethod · 0.80
count_xactsMethod · 0.80
count_test_recordsMethod · 0.80
tpc_beginMethod · 0.45
cursorMethod · 0.45
executeMethod · 0.45
tpc_prepareMethod · 0.45
tpc_commitMethod · 0.45

Tested by

no test coverage detected