Generator implementing `ServerCursor.execute()`.
(
self, query: Query, params: Params | None = None, binary: bool | None = None
)
| 100 | return self._pos if tuples else None |
| 101 | |
| 102 | def _declare_gen( |
| 103 | self, query: Query, params: Params | None = None, binary: bool | None = None |
| 104 | ) -> PQGen[None]: |
| 105 | """Generator implementing `ServerCursor.execute()`.""" |
| 106 | |
| 107 | query = self._make_declare_statement(query) |
| 108 | |
| 109 | # If the cursor is being reused, the previous one must be closed. |
| 110 | if self._described: |
| 111 | yield from self._close_gen() |
| 112 | self._described = False |
| 113 | |
| 114 | self._iter_rows = None |
| 115 | yield from self._start_query(query) |
| 116 | pgq = self._convert_query(query, params) |
| 117 | self._execute_send(pgq, force_extended=True) |
| 118 | results = yield from execute(self._conn.pgconn) |
| 119 | if results[-1].status != COMMAND_OK: |
| 120 | self._raise_for_result(results[-1]) |
| 121 | |
| 122 | # Set the format, which will be used by describe and fetch operations |
| 123 | if binary is None: |
| 124 | self._format = self.format |
| 125 | else: |
| 126 | self._format = BINARY if binary else TEXT |
| 127 | |
| 128 | # The above result only returned COMMAND_OK. Get the cursor shape |
| 129 | yield from self._describe_gen() |
| 130 | |
| 131 | def _describe_gen(self) -> PQGen[None]: |
| 132 | self._pgconn.send_describe_portal(self._name.encode(self._encoding)) |
no test coverage detected