(cls, subject: _TConsSubject)
| 94 | |
| 95 | @classmethod |
| 96 | def _trans_ctx_check(cls, subject: _TConsSubject) -> None: |
| 97 | trans_context = subject._trans_context_manager |
| 98 | if trans_context: |
| 99 | if not trans_context._transaction_is_active(): |
| 100 | raise exc.InvalidRequestError( |
| 101 | "Can't operate on closed transaction inside context " |
| 102 | "manager. Please complete the context manager " |
| 103 | "before emitting further commands." |
| 104 | ) |
| 105 | |
| 106 | def __enter__(self) -> Self: |
| 107 | subject = self._get_subject() |
no test coverage detected