(self)
| 648 | yield from self._exec_command(self._get_tx_start_command()) |
| 649 | |
| 650 | def _tpc_prepare_gen(self) -> PQGen[None]: |
| 651 | if not self._tpc: |
| 652 | raise e.ProgrammingError( |
| 653 | "'tpc_prepare()' must be called inside a two-phase transaction" |
| 654 | ) |
| 655 | if self._tpc[1]: |
| 656 | raise e.ProgrammingError( |
| 657 | "'tpc_prepare()' cannot be used during a prepared two-phase transaction" |
| 658 | ) |
| 659 | xid = self._tpc[0] |
| 660 | self._tpc = (xid, True) |
| 661 | yield from self._exec_command(SQL("PREPARE TRANSACTION {}").format(str(xid))) |
| 662 | if self._pipeline: |
| 663 | yield from self._pipeline._sync_gen() |
| 664 | |
| 665 | def _tpc_finish_gen( |
| 666 | self, action: LiteralString, xid: Xid | str | None |
no test coverage detected