MCPcopy
hub / github.com/psycopg/psycopg / _run_pipeline_communicate

Function _run_pipeline_communicate

tests/test_generators.py:81–95  ·  view source on GitHub ↗
(pgconn, generators, commands, expected_statuses)

Source from the content-addressed store, hash-verified

79
80
81def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses):
82 actual_statuses: list[pq.ExecStatus] = []
83 while len(actual_statuses) != len(expected_statuses):
84 if commands:
85 gen = generators.pipeline_communicate(pgconn, commands)
86 results = waiting.wait(gen, pgconn.socket)
87 for (result,) in results:
88 actual_statuses.append(result.status)
89 else:
90 gen = generators.fetch_many(pgconn)
91 results = waiting.wait(gen, pgconn.socket)
92 for result in results:
93 actual_statuses.append(result.status)
94
95 assert actual_statuses == expected_statuses
96
97
98@pytest.mark.pipeline

Calls 1

waitMethod · 0.45

Tested by

no test coverage detected