Generator implementing `Cursor.executemany()` with pipelines not available.
(
self, query: Query, params_seq: Iterable[Params], returning: bool
)
| 246 | yield from self._conn._prepared.maintain_gen(self._conn) |
| 247 | |
| 248 | def _executemany_gen_no_pipeline( |
| 249 | self, query: Query, params_seq: Iterable[Params], returning: bool |
| 250 | ) -> PQGen[None]: |
| 251 | """ |
| 252 | Generator implementing `Cursor.executemany()` with pipelines not available. |
| 253 | """ |
| 254 | yield from self._start_query(query) |
| 255 | if not returning: |
| 256 | self._rowcount = 0 |
| 257 | |
| 258 | assert self._execmany_returning is None |
| 259 | self._execmany_returning = returning |
| 260 | |
| 261 | first = True |
| 262 | for params in params_seq: |
| 263 | if first: |
| 264 | pgq = self._convert_query(query, params) |
| 265 | self._query = pgq |
| 266 | first = False |
| 267 | else: |
| 268 | pgq.dump(params) |
| 269 | |
| 270 | yield from self._maybe_prepare_gen(pgq, prepare=True) |
| 271 | |
| 272 | self._last_query = query |
| 273 | yield from self._conn._prepared.maintain_gen(self._conn) |
| 274 | |
| 275 | def _maybe_prepare_gen( |
| 276 | self, |
no test coverage detected