| 268 | |
| 269 | @pytest.mark.crdb_skip("backend pid") |
| 270 | def test_intrans_rollback(dsn, caplog): |
| 271 | caplog.set_level(logging.WARNING, logger="psycopg.pool") |
| 272 | pids = [] |
| 273 | |
| 274 | def worker(): |
| 275 | with p.connection() as conn: |
| 276 | pids.append(conn.info.backend_pid) |
| 277 | assert conn.info.transaction_status == TransactionStatus.IDLE |
| 278 | cur = conn.execute( |
| 279 | "select 1 from pg_class where relname = 'test_intrans_rollback'" |
| 280 | ) |
| 281 | assert not cur.fetchone() |
| 282 | |
| 283 | with pool.NullConnectionPool(dsn, max_size=1) as p: |
| 284 | conn = p.getconn() |
| 285 | |
| 286 | # Queue the worker so it will take the connection a second time instead |
| 287 | # of making a new one. |
| 288 | t = spawn(worker) |
| 289 | ensure_waiting(p) |
| 290 | |
| 291 | pids.append(conn.info.backend_pid) |
| 292 | conn.execute("create table test_intrans_rollback ()") |
| 293 | assert conn.info.transaction_status == TransactionStatus.INTRANS |
| 294 | p.putconn(conn) |
| 295 | gather(t) |
| 296 | |
| 297 | assert pids[0] == pids[1] |
| 298 | assert len(caplog.records) == 1 |
| 299 | assert "INTRANS" in caplog.records[0].message |
| 300 | |
| 301 | |
| 302 | @pytest.mark.crdb_skip("backend pid") |