(pgconn)
| 29 | |
| 30 | @pytest.mark.libpq(">= 14") |
| 31 | def test_multi_pipelines(pgconn): |
| 32 | assert pgconn.pipeline_status == pq.PipelineStatus.OFF |
| 33 | pgconn.enter_pipeline_mode() |
| 34 | pgconn.send_query_params(b"select $1", [b"1"], param_types=[25]) |
| 35 | pgconn.pipeline_sync() |
| 36 | pgconn.send_query_params(b"select $1", [b"2"], param_types=[25]) |
| 37 | pgconn.pipeline_sync() |
| 38 | |
| 39 | # result from first query |
| 40 | result1 = pgconn.get_result() |
| 41 | assert result1 is not None |
| 42 | assert result1.status == pq.ExecStatus.TUPLES_OK |
| 43 | |
| 44 | # NULL signals end of result |
| 45 | assert pgconn.get_result() is None |
| 46 | |
| 47 | # first sync result |
| 48 | sync_result = pgconn.get_result() |
| 49 | assert sync_result is not None |
| 50 | assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC |
| 51 | |
| 52 | # result from second query |
| 53 | result2 = pgconn.get_result() |
| 54 | assert result2 is not None |
| 55 | assert result2.status == pq.ExecStatus.TUPLES_OK |
| 56 | |
| 57 | # NULL signals end of result |
| 58 | assert pgconn.get_result() is None |
| 59 | |
| 60 | # second sync result |
| 61 | sync_result = pgconn.get_result() |
| 62 | assert sync_result is not None |
| 63 | assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC |
| 64 | |
| 65 | # pipeline still ON |
| 66 | assert pgconn.pipeline_status == pq.PipelineStatus.ON |
| 67 | |
| 68 | pgconn.exit_pipeline_mode() |
| 69 | |
| 70 | assert pgconn.pipeline_status == pq.PipelineStatus.OFF |
| 71 | |
| 72 | assert result1.get_value(0, 0) == b"1" |
| 73 | assert result2.get_value(0, 0) == b"2" |
| 74 | |
| 75 | |
| 76 | @pytest.mark.libpq(">= 14") |
nothing calls this directly
no test coverage detected