MCPcopy
hub / github.com/psycopg/psycopg / test_tpc_recover_non_dbapi_connection

Method test_tpc_recover_non_dbapi_connection

tests/test_tpc_async.py:291–303  ·  view source on GitHub ↗
(self, aconn_cls, aconn, dsn, tpc)

Source from the content-addressed store, hash-verified

289 await aconn.cancel_safe()
290
291 async def test_tpc_recover_non_dbapi_connection(self, aconn_cls, aconn, dsn, tpc):
292 aconn.row_factory = psycopg.rows.dict_row
293 await aconn.tpc_begin("dict-connection")
294 await aconn.tpc_prepare()
295 await aconn.close()
296
297 async with await aconn_cls.connect(dsn) as aconn:
298 xids = await aconn.tpc_recover()
299 xid = [x for x in xids if x.database == aconn.info.dbname][0]
300
301 assert xid.format_id is None
302 assert xid.gtrid == "dict-connection"
303 assert xid.bqual is None

Callers

nothing calls this directly

Calls 5

tpc_beginMethod · 0.45
tpc_prepareMethod · 0.45
closeMethod · 0.45
connectMethod · 0.45
tpc_recoverMethod · 0.45

Tested by

no test coverage detected