(pgconn, table)
| 95 | |
| 96 | @pytest.mark.libpq(">= 14") |
| 97 | def test_pipeline_abort(pgconn, table): |
| 98 | assert pgconn.pipeline_status == pq.PipelineStatus.OFF |
| 99 | pgconn.enter_pipeline_mode() |
| 100 | pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"]) |
| 101 | pgconn.send_query_params(b"select no_such_function($1)", [b"1"]) |
| 102 | pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"]) |
| 103 | pgconn.pipeline_sync() |
| 104 | pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"]) |
| 105 | pgconn.pipeline_sync() |
| 106 | |
| 107 | # result from first INSERT |
| 108 | r = pgconn.get_result() |
| 109 | assert r is not None |
| 110 | assert r.status == pq.ExecStatus.COMMAND_OK |
| 111 | |
| 112 | # NULL signals end of result |
| 113 | assert pgconn.get_result() is None |
| 114 | |
| 115 | # error result from second query (SELECT) |
| 116 | r = pgconn.get_result() |
| 117 | assert r is not None |
| 118 | assert r.status == pq.ExecStatus.FATAL_ERROR |
| 119 | |
| 120 | # NULL signals end of result |
| 121 | assert pgconn.get_result() is None |
| 122 | |
| 123 | # pipeline should be aborted, due to previous error |
| 124 | assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED |
| 125 | |
| 126 | # result from second INSERT, aborted due to previous error |
| 127 | r = pgconn.get_result() |
| 128 | assert r is not None |
| 129 | assert r.status == pq.ExecStatus.PIPELINE_ABORTED |
| 130 | |
| 131 | # NULL signals end of result |
| 132 | assert pgconn.get_result() is None |
| 133 | |
| 134 | # pipeline is still aborted |
| 135 | assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED |
| 136 | |
| 137 | # sync result |
| 138 | r = pgconn.get_result() |
| 139 | assert r is not None |
| 140 | assert r.status == pq.ExecStatus.PIPELINE_SYNC |
| 141 | |
| 142 | # aborted flag is clear, pipeline is on again |
| 143 | assert pgconn.pipeline_status == pq.PipelineStatus.ON |
| 144 | |
| 145 | # result from the third INSERT |
| 146 | r = pgconn.get_result() |
| 147 | assert r is not None |
| 148 | assert r.status == pq.ExecStatus.COMMAND_OK |
| 149 | |
| 150 | # NULL signals end of result |
| 151 | assert pgconn.get_result() is None |
| 152 | |
| 153 | # second sync result |
| 154 | r = pgconn.get_result() |
nothing calls this directly
no test coverage detected