MCPcopy
hub / github.com/psycopg/psycopg / test_xid_roundtrip

Method test_xid_roundtrip

tests/test_tpc_async.py:211–227  ·  view source on GitHub ↗
(self, aconn_cls, aconn, dsn, tpc, fid, gtrid, bqual)

Source from the content-addressed store, hash-verified

209 ],
210 )
211 async def test_xid_roundtrip(self, aconn_cls, aconn, dsn, tpc, fid, gtrid, bqual):
212 xid = aconn.xid(fid, gtrid, bqual)
213 await aconn.tpc_begin(xid)
214 await aconn.tpc_prepare()
215 await aconn.close()
216
217 async with await aconn_cls.connect(dsn) as aconn:
218 xids = [
219 x for x in await aconn.tpc_recover() if x.database == aconn.info.dbname
220 ]
221 assert len(xids) == 1
222 xid = xids[0]
223 await aconn.tpc_rollback(xid)
224
225 assert xid.format_id == fid
226 assert xid.gtrid == gtrid
227 assert xid.bqual == bqual
228
229 # 199 is PostgreSQL's limit in transaction id length
230 @pytest.mark.parametrize("tid", ["", "hello, world!", "x" * 199])

Callers

nothing calls this directly

Calls 7

xidMethod · 0.80
tpc_beginMethod · 0.45
tpc_prepareMethod · 0.45
closeMethod · 0.45
connectMethod · 0.45
tpc_recoverMethod · 0.45
tpc_rollbackMethod · 0.45

Tested by

no test coverage detected