MCPcopy
hub / github.com/psycopg/psycopg / test_pipeline_communicate_abort

Function test_pipeline_communicate_abort

tests/test_generators.py:146–170  ·  view source on GitHub ↗
(pgconn, pipeline_demo, pipeline, generators)

Source from the content-addressed store, hash-verified

144@pytest.mark.pipeline
145@pytest.mark.crdb("skip", reason="pipeline aborted")
146def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators):
147 insert_sql = b"insert into pg_pipeline(itemno) values ($1)"
148 commands = deque(
149 [
150 partial(pgconn.send_query_params, insert_sql, [b"1"]),
151 partial(pgconn.send_query_params, b"select no_such_function(1)", None),
152 partial(pgconn.send_query_params, insert_sql, [b"2"]),
153 pgconn.pipeline_sync,
154 partial(pgconn.send_query_params, insert_sql, [b"3"]),
155 pgconn.pipeline_sync,
156 ]
157 )
158 expected_statuses = [
159 pq.ExecStatus.COMMAND_OK,
160 pq.ExecStatus.FATAL_ERROR,
161 pq.ExecStatus.PIPELINE_ABORTED,
162 pq.ExecStatus.PIPELINE_SYNC,
163 pq.ExecStatus.COMMAND_OK,
164 pq.ExecStatus.PIPELINE_SYNC,
165 ]
166 _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
167 pgconn.exit_pipeline_mode()
168 res = pgconn.exec_(b"select itemno from pg_pipeline order by itemno")
169 assert res.ntuples == 1
170 assert res.get_value(0, 0) == b"3"
171
172
173@pytest.fixture

Callers

nothing calls this directly

Calls 4

exit_pipeline_modeMethod · 0.45
exec_Method · 0.45
get_valueMethod · 0.45

Tested by

no test coverage detected