MCPcopy
hub / github.com/psycopg/psycopg / test_recovered_xids

Method test_recovered_xids

tests/test_tpc.py:165–193  ·  view source on GitHub ↗
(self, conn, tpc)

Source from the content-addressed store, hash-verified

163 assert conn.info.transaction_status == TransactionStatus.INTRANS
164
165 def test_recovered_xids(self, conn, tpc):
166 # insert a few test xns
167 conn.set_autocommit(True)
168 cur = conn.cursor()
169 cur.execute("begin; prepare transaction '1-foo'")
170 cur.execute("begin; prepare transaction '2-bar'")
171
172 # read the values to return
173 cur.execute(
174 """
175 select gid, prepared, owner, database from pg_prepared_xacts
176 where database = %s
177 """,
178 (conn.info.dbname,),
179 )
180 okvals = cur.fetchall()
181 okvals.sort()
182
183 xids = conn.tpc_recover()
184 xids = [xid for xid in xids if xid.database == conn.info.dbname]
185 xids.sort(key=lambda x: x.gtrid)
186
187 # check the values returned
188 assert len(okvals) == len(xids)
189 for xid, (gid, prepared, owner, database) in zip(xids, okvals):
190 assert xid.gtrid == gid
191 assert xid.prepared == prepared
192 assert xid.owner == owner
193 assert xid.database == database
194
195 def test_xid_encoding(self, conn, tpc):
196 xid = conn.xid(42, "gtrid", "bqual")

Callers

nothing calls this directly

Calls 5

set_autocommitMethod · 0.45
cursorMethod · 0.45
executeMethod · 0.45
fetchallMethod · 0.45
tpc_recoverMethod · 0.45

Tested by

no test coverage detected