MCPcopy
hub / github.com/psycopg/psycopg / test_recovered_xids

Method test_recovered_xids

tests/test_tpc_async.py:161–189  ·  view source on GitHub ↗
(self, aconn, tpc)

Source from the content-addressed store, hash-verified

159 assert aconn.info.transaction_status == TransactionStatus.INTRANS
160
161 async def test_recovered_xids(self, aconn, tpc):
162 # insert a few test xns
163 await aconn.set_autocommit(True)
164 cur = aconn.cursor()
165 await cur.execute("begin; prepare transaction '1-foo'")
166 await cur.execute("begin; prepare transaction '2-bar'")
167
168 # read the values to return
169 await cur.execute(
170 """
171 select gid, prepared, owner, database from pg_prepared_xacts
172 where database = %s
173 """,
174 (aconn.info.dbname,),
175 )
176 okvals = await cur.fetchall()
177 okvals.sort()
178
179 xids = await aconn.tpc_recover()
180 xids = [xid for xid in xids if xid.database == aconn.info.dbname]
181 xids.sort(key=lambda x: x.gtrid)
182
183 # check the values returned
184 assert len(okvals) == len(xids)
185 for xid, (gid, prepared, owner, database) in zip(xids, okvals):
186 assert xid.gtrid == gid
187 assert xid.prepared == prepared
188 assert xid.owner == owner
189 assert xid.database == database
190
191 async def test_xid_encoding(self, aconn, tpc):
192 xid = aconn.xid(42, "gtrid", "bqual")

Callers

nothing calls this directly

Calls 5

set_autocommitMethod · 0.45
cursorMethod · 0.45
executeMethod · 0.45
fetchallMethod · 0.45
tpc_recoverMethod · 0.45

Tested by

no test coverage detected