MCPcopy
hub / github.com/psycopg/psycopg / test_pipeline_communicate_uniqviol

Function test_pipeline_communicate_uniqviol

tests/test_generators.py:197–216  ·  view source on GitHub ↗
(pgconn, pipeline_uniqviol, pipeline, generators)

Source from the content-addressed store, hash-verified

195
196
197def test_pipeline_communicate_uniqviol(pgconn, pipeline_uniqviol, pipeline, generators):
198 commands = deque(
199 [
200 partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
201 partial(pgconn.send_query_prepared, b"insertion", [b"2", b"2"]),
202 partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
203 partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]),
204 partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]),
205 partial(pgconn.send_query_params, b"commit", None),
206 ]
207 )
208 expected_statuses = [
209 pq.ExecStatus.TUPLES_OK,
210 pq.ExecStatus.TUPLES_OK,
211 pq.ExecStatus.FATAL_ERROR,
212 pq.ExecStatus.PIPELINE_ABORTED,
213 pq.ExecStatus.PIPELINE_ABORTED,
214 pq.ExecStatus.PIPELINE_ABORTED,
215 ]
216 _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)

Callers

nothing calls this directly

Calls 1

Tested by

no test coverage detected