MCPcopy
hub / github.com/psycopg/psycopg / test_pipeline_communicate_multi_pipeline

Function test_pipeline_communicate_multi_pipeline

tests/test_generators.py:99–114  ·  view source on GitHub ↗
(pgconn, pipeline, generators)

Source from the content-addressed store, hash-verified

97
98@pytest.mark.pipeline
99def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators):
100 commands = deque(
101 [
102 partial(pgconn.send_query_params, b"select 1", None),
103 pgconn.pipeline_sync,
104 partial(pgconn.send_query_params, b"select 2", None),
105 pgconn.pipeline_sync,
106 ]
107 )
108 expected_statuses = [
109 pq.ExecStatus.TUPLES_OK,
110 pq.ExecStatus.PIPELINE_SYNC,
111 pq.ExecStatus.TUPLES_OK,
112 pq.ExecStatus.PIPELINE_SYNC,
113 ]
114 _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
115
116
117@pytest.mark.pipeline

Callers

nothing calls this directly

Calls 1

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…