(self, aconn_cls, aconn, dsn, tpc)
| 289 | await aconn.cancel_safe() |
| 290 | |
| 291 | async def test_tpc_recover_non_dbapi_connection(self, aconn_cls, aconn, dsn, tpc): |
| 292 | aconn.row_factory = psycopg.rows.dict_row |
| 293 | await aconn.tpc_begin("dict-connection") |
| 294 | await aconn.tpc_prepare() |
| 295 | await aconn.close() |
| 296 | |
| 297 | async with await aconn_cls.connect(dsn) as aconn: |
| 298 | xids = await aconn.tpc_recover() |
| 299 | xid = [x for x in xids if x.database == aconn.info.dbname][0] |
| 300 | |
| 301 | assert xid.format_id is None |
| 302 | assert xid.gtrid == "dict-connection" |
| 303 | assert xid.bqual is None |
nothing calls this directly
no test coverage detected