(pgconn, pipeline_uniqviol, pipeline, generators)
| 195 | |
| 196 | |
| 197 | def test_pipeline_communicate_uniqviol(pgconn, pipeline_uniqviol, pipeline, generators): |
| 198 | commands = deque( |
| 199 | [ |
| 200 | partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]), |
| 201 | partial(pgconn.send_query_prepared, b"insertion", [b"2", b"2"]), |
| 202 | partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]), |
| 203 | partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]), |
| 204 | partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]), |
| 205 | partial(pgconn.send_query_params, b"commit", None), |
| 206 | ] |
| 207 | ) |
| 208 | expected_statuses = [ |
| 209 | pq.ExecStatus.TUPLES_OK, |
| 210 | pq.ExecStatus.TUPLES_OK, |
| 211 | pq.ExecStatus.FATAL_ERROR, |
| 212 | pq.ExecStatus.PIPELINE_ABORTED, |
| 213 | pq.ExecStatus.PIPELINE_ABORTED, |
| 214 | pq.ExecStatus.PIPELINE_ABORTED, |
| 215 | ] |
| 216 | _run_pipeline_communicate(pgconn, generators, commands, expected_statuses) |
nothing calls this directly
no test coverage detected