(conn: psycopg.Connection[Any])
| 50 | |
| 51 | |
| 52 | def test_pipeline_reenter(conn: psycopg.Connection[Any]) -> None: |
| 53 | with conn.pipeline() as p1: |
| 54 | with conn.pipeline() as p2: |
| 55 | assert p2 is p1 |
| 56 | assert p1.status == pq.PipelineStatus.ON |
| 57 | assert p2 is p1 |
| 58 | assert p2.status == pq.PipelineStatus.ON |
| 59 | assert conn._pipeline is None |
| 60 | assert p1.status == pq.PipelineStatus.OFF # type: ignore[comparison-overlap] |
| 61 | |
| 62 | |
| 63 | def test_pipeline_broken_conn_exit(conn: psycopg.Connection[Any]) -> None: |