Generator implementing `Cursor.execute()`.
(
self,
query: Query,
params: Params | None = None,
*,
prepare: bool | None = None,
binary: bool | None = None,
)
| 193 | # |
| 194 | |
| 195 | def _execute_gen( |
| 196 | self, |
| 197 | query: Query, |
| 198 | params: Params | None = None, |
| 199 | *, |
| 200 | prepare: bool | None = None, |
| 201 | binary: bool | None = None, |
| 202 | ) -> PQGen[None]: |
| 203 | """Generator implementing `Cursor.execute()`.""" |
| 204 | yield from self._start_query(query) |
| 205 | pgq = self._convert_query(query, params) |
| 206 | yield from self._maybe_prepare_gen(pgq, prepare=prepare, binary=binary) |
| 207 | if self._conn._pipeline: |
| 208 | yield from self._conn._pipeline._communicate_gen() |
| 209 | |
| 210 | self._last_query = query |
| 211 | yield from self._conn._prepared.maintain_gen(self._conn) |
| 212 | |
| 213 | def _executemany_gen_pipeline( |
| 214 | self, query: Query, params_seq: Iterable[Params], returning: bool |
no test coverage detected