(self, aconn_cls, aconn, dsn, tpc, fid, gtrid, bqual)
| 209 | ], |
| 210 | ) |
| 211 | async def test_xid_roundtrip(self, aconn_cls, aconn, dsn, tpc, fid, gtrid, bqual): |
| 212 | xid = aconn.xid(fid, gtrid, bqual) |
| 213 | await aconn.tpc_begin(xid) |
| 214 | await aconn.tpc_prepare() |
| 215 | await aconn.close() |
| 216 | |
| 217 | async with await aconn_cls.connect(dsn) as aconn: |
| 218 | xids = [ |
| 219 | x for x in await aconn.tpc_recover() if x.database == aconn.info.dbname |
| 220 | ] |
| 221 | assert len(xids) == 1 |
| 222 | xid = xids[0] |
| 223 | await aconn.tpc_rollback(xid) |
| 224 | |
| 225 | assert xid.format_id == fid |
| 226 | assert xid.gtrid == gtrid |
| 227 | assert xid.bqual == bqual |
| 228 | |
| 229 | # 199 is PostgreSQL's limit in transaction id length |
| 230 | @pytest.mark.parametrize("tid", ["", "hello, world!", "x" * 199]) |
nothing calls this directly
no test coverage detected