(self, xid: Xid | str)
| 628 | return Xid.from_parts(format_id, gtrid, bqual) |
| 629 | |
| 630 | def _tpc_begin_gen(self, xid: Xid | str) -> PQGen[None]: |
| 631 | self._check_tpc() |
| 632 | |
| 633 | if not isinstance(xid, Xid): |
| 634 | xid = Xid.from_string(xid) |
| 635 | |
| 636 | if self.pgconn.transaction_status != IDLE: |
| 637 | raise e.ProgrammingError( |
| 638 | "can't start two-phase transaction: connection in status" |
| 639 | f" {pq.TransactionStatus(self.pgconn.transaction_status).name}" |
| 640 | ) |
| 641 | |
| 642 | if self._autocommit: |
| 643 | raise e.ProgrammingError( |
| 644 | "can't use two-phase transactions in autocommit mode" |
| 645 | ) |
| 646 | |
| 647 | self._tpc = (xid, False) |
| 648 | yield from self._exec_command(self._get_tx_start_command()) |
| 649 | |
| 650 | def _tpc_prepare_gen(self) -> PQGen[None]: |
| 651 | if not self._tpc: |
no test coverage detected