| 267 | |
| 268 | @pytest.mark.crdb_skip("backend pid") |
| 269 | async def test_intrans_rollback(dsn, caplog): |
| 270 | caplog.set_level(logging.WARNING, logger="psycopg.pool") |
| 271 | pids = [] |
| 272 | |
| 273 | async def worker(): |
| 274 | async with p.connection() as conn: |
| 275 | pids.append(conn.info.backend_pid) |
| 276 | assert conn.info.transaction_status == TransactionStatus.IDLE |
| 277 | cur = await conn.execute( |
| 278 | "select 1 from pg_class where relname = 'test_intrans_rollback'" |
| 279 | ) |
| 280 | assert not await cur.fetchone() |
| 281 | |
| 282 | async with pool.AsyncNullConnectionPool(dsn, max_size=1) as p: |
| 283 | conn = await p.getconn() |
| 284 | |
| 285 | # Queue the worker so it will take the connection a second time instead |
| 286 | # of making a new one. |
| 287 | t = spawn(worker) |
| 288 | await ensure_waiting(p) |
| 289 | |
| 290 | pids.append(conn.info.backend_pid) |
| 291 | await conn.execute("create table test_intrans_rollback ()") |
| 292 | assert conn.info.transaction_status == TransactionStatus.INTRANS |
| 293 | await p.putconn(conn) |
| 294 | await gather(t) |
| 295 | |
| 296 | assert pids[0] == pids[1] |
| 297 | assert len(caplog.records) == 1 |
| 298 | assert "INTRANS" in caplog.records[0].message |
| 299 | |
| 300 | |
| 301 | @pytest.mark.crdb_skip("backend pid") |