(self, aconn_cls, aconn, dsn, tpc)
| 261 | assert "code" == xid.bqual |
| 262 | |
| 263 | async def test_xid_unicode_unparsed(self, aconn_cls, aconn, dsn, tpc): |
| 264 | # We don't expect people shooting snowmen as transaction ids, |
| 265 | # so if something explodes in an encode error I don't mind. |
| 266 | # Let's just check unicode is accepted as type. |
| 267 | await aconn.execute("set client_encoding to utf8") |
| 268 | await aconn.commit() |
| 269 | |
| 270 | await aconn.tpc_begin("transaction-id") |
| 271 | await aconn.tpc_prepare() |
| 272 | await aconn.close() |
| 273 | |
| 274 | async with await aconn_cls.connect(dsn) as aconn: |
| 275 | xid = [ |
| 276 | x for x in await aconn.tpc_recover() if x.database == aconn.info.dbname |
| 277 | ][0] |
| 278 | |
| 279 | assert xid.format_id is None |
| 280 | assert xid.gtrid == "transaction-id" |
| 281 | assert xid.bqual is None |
| 282 | |
| 283 | async def test_cancel_fails_prepared(self, aconn, tpc): |
| 284 | await aconn.tpc_begin("cancel") |
nothing calls this directly
no test coverage detected