Generator implementing `Cursor.executemany()` with pipelines available.
(
self, query: Query, params_seq: Iterable[Params], returning: bool
)
| 211 | yield from self._conn._prepared.maintain_gen(self._conn) |
| 212 | |
| 213 | def _executemany_gen_pipeline( |
| 214 | self, query: Query, params_seq: Iterable[Params], returning: bool |
| 215 | ) -> PQGen[None]: |
| 216 | """ |
| 217 | Generator implementing `Cursor.executemany()` with pipelines available. |
| 218 | """ |
| 219 | pipeline = self._conn._pipeline |
| 220 | assert pipeline |
| 221 | |
| 222 | yield from self._start_query(query) |
| 223 | if not returning: |
| 224 | self._rowcount = 0 |
| 225 | |
| 226 | assert self._execmany_returning is None |
| 227 | self._execmany_returning = returning |
| 228 | |
| 229 | first = True |
| 230 | for params in params_seq: |
| 231 | if first: |
| 232 | pgq = self._convert_query(query, params) |
| 233 | self._query = pgq |
| 234 | first = False |
| 235 | else: |
| 236 | pgq.dump(params) |
| 237 | |
| 238 | yield from self._maybe_prepare_gen(pgq, prepare=True) |
| 239 | yield from pipeline._communicate_gen() |
| 240 | |
| 241 | self._last_query = query |
| 242 | |
| 243 | if returning: |
| 244 | yield from pipeline._fetch_gen(flush=True) |
| 245 | |
| 246 | yield from self._conn._prepared.maintain_gen(self._conn) |
| 247 | |
| 248 | def _executemany_gen_no_pipeline( |
| 249 | self, query: Query, params_seq: Iterable[Params], returning: bool |
no test coverage detected