(pgconn, generators, commands, expected_statuses)
| 79 | |
| 80 | |
| 81 | def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses): |
| 82 | actual_statuses: list[pq.ExecStatus] = [] |
| 83 | while len(actual_statuses) != len(expected_statuses): |
| 84 | if commands: |
| 85 | gen = generators.pipeline_communicate(pgconn, commands) |
| 86 | results = waiting.wait(gen, pgconn.socket) |
| 87 | for (result,) in results: |
| 88 | actual_statuses.append(result.status) |
| 89 | else: |
| 90 | gen = generators.fetch_many(pgconn) |
| 91 | results = waiting.wait(gen, pgconn.socket) |
| 92 | for result in results: |
| 93 | actual_statuses.append(result.status) |
| 94 | |
| 95 | assert actual_statuses == expected_statuses |
| 96 | |
| 97 | |
| 98 | @pytest.mark.pipeline |
no test coverage detected