(
self, e: Optional[BaseException] = None, soft: bool = False
)
| 782 | self.__close() |
| 783 | |
| 784 | def invalidate( |
| 785 | self, e: Optional[BaseException] = None, soft: bool = False |
| 786 | ) -> None: |
| 787 | # already invalidated |
| 788 | if self.dbapi_connection is None: |
| 789 | return |
| 790 | if soft: |
| 791 | self.__pool.dispatch.soft_invalidate( |
| 792 | self.dbapi_connection, self, e |
| 793 | ) |
| 794 | else: |
| 795 | self.__pool.dispatch.invalidate(self.dbapi_connection, self, e) |
| 796 | if e is not None: |
| 797 | self.__pool.logger.info( |
| 798 | "%sInvalidate connection %r (reason: %s:%s)", |
| 799 | "Soft " if soft else "", |
| 800 | self.dbapi_connection, |
| 801 | e.__class__.__name__, |
| 802 | e, |
| 803 | ) |
| 804 | else: |
| 805 | self.__pool.logger.info( |
| 806 | "%sInvalidate connection %r", |
| 807 | "Soft " if soft else "", |
| 808 | self.dbapi_connection, |
| 809 | ) |
| 810 | |
| 811 | if soft: |
| 812 | self._soft_invalidate_time = time.time() |
| 813 | else: |
| 814 | self.__close(terminate=True) |
| 815 | self.dbapi_connection = None |
| 816 | |
| 817 | def get_connection(self) -> DBAPIConnection: |
| 818 | recycle = False |
no test coverage detected