(pgconn, pipeline, generators)
| 97 | |
| 98 | @pytest.mark.pipeline |
| 99 | def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators): |
| 100 | commands = deque( |
| 101 | [ |
| 102 | partial(pgconn.send_query_params, b"select 1", None), |
| 103 | pgconn.pipeline_sync, |
| 104 | partial(pgconn.send_query_params, b"select 2", None), |
| 105 | pgconn.pipeline_sync, |
| 106 | ] |
| 107 | ) |
| 108 | expected_statuses = [ |
| 109 | pq.ExecStatus.TUPLES_OK, |
| 110 | pq.ExecStatus.PIPELINE_SYNC, |
| 111 | pq.ExecStatus.TUPLES_OK, |
| 112 | pq.ExecStatus.PIPELINE_SYNC, |
| 113 | ] |
| 114 | _run_pipeline_communicate(pgconn, generators, commands, expected_statuses) |
| 115 | |
| 116 | |
| 117 | @pytest.mark.pipeline |
nothing calls this directly
no test coverage detected
searching dependent graphs…