(pgconn, pipeline_demo, pipeline, generators)
| 144 | @pytest.mark.pipeline |
| 145 | @pytest.mark.crdb("skip", reason="pipeline aborted") |
| 146 | def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators): |
| 147 | insert_sql = b"insert into pg_pipeline(itemno) values ($1)" |
| 148 | commands = deque( |
| 149 | [ |
| 150 | partial(pgconn.send_query_params, insert_sql, [b"1"]), |
| 151 | partial(pgconn.send_query_params, b"select no_such_function(1)", None), |
| 152 | partial(pgconn.send_query_params, insert_sql, [b"2"]), |
| 153 | pgconn.pipeline_sync, |
| 154 | partial(pgconn.send_query_params, insert_sql, [b"3"]), |
| 155 | pgconn.pipeline_sync, |
| 156 | ] |
| 157 | ) |
| 158 | expected_statuses = [ |
| 159 | pq.ExecStatus.COMMAND_OK, |
| 160 | pq.ExecStatus.FATAL_ERROR, |
| 161 | pq.ExecStatus.PIPELINE_ABORTED, |
| 162 | pq.ExecStatus.PIPELINE_SYNC, |
| 163 | pq.ExecStatus.COMMAND_OK, |
| 164 | pq.ExecStatus.PIPELINE_SYNC, |
| 165 | ] |
| 166 | _run_pipeline_communicate(pgconn, generators, commands, expected_statuses) |
| 167 | pgconn.exit_pipeline_mode() |
| 168 | res = pgconn.exec_(b"select itemno from pg_pipeline order by itemno") |
| 169 | assert res.ntuples == 1 |
| 170 | assert res.get_value(0, 0) == b"3" |
| 171 | |
| 172 | |
| 173 | @pytest.fixture |
nothing calls this directly
no test coverage detected