(self, aconn, tpc)
| 189 | assert xid.database == database |
| 190 | |
| 191 | async def test_xid_encoding(self, aconn, tpc): |
| 192 | xid = aconn.xid(42, "gtrid", "bqual") |
| 193 | await aconn.tpc_begin(xid) |
| 194 | await aconn.tpc_prepare() |
| 195 | |
| 196 | cur = aconn.cursor() |
| 197 | await cur.execute( |
| 198 | "select gid from pg_prepared_xacts where database = %s", |
| 199 | (aconn.info.dbname,), |
| 200 | ) |
| 201 | assert "42_Z3RyaWQ=_YnF1YWw=" == (await cur.fetchone())[0] |
| 202 | |
| 203 | @pytest.mark.parametrize( |
| 204 | "fid, gtrid, bqual", |