Close the connection reset watching state and raise an exception if we were watching. The supported exceptions are already checked in the retry object so we don't need to do it here. After we disconnect the connection, it will try to reconnect and d
(
self,
conn: Connection,
error: Exception,
failure_count: Optional[int] = None,
start_time: Optional[float] = None,
command_name: Optional[str] = None,
)
| 1753 | return self.pipeline_execute_command(*args, **kwargs) |
| 1754 | |
| 1755 | async def _disconnect_reset_raise_on_watching( |
| 1756 | self, |
| 1757 | conn: Connection, |
| 1758 | error: Exception, |
| 1759 | failure_count: Optional[int] = None, |
| 1760 | start_time: Optional[float] = None, |
| 1761 | command_name: Optional[str] = None, |
| 1762 | ) -> None: |
| 1763 | """ |
| 1764 | Close the connection reset watching state and |
| 1765 | raise an exception if we were watching. |
| 1766 | |
| 1767 | The supported exceptions are already checked in the |
| 1768 | retry object so we don't need to do it here. |
| 1769 | |
| 1770 | After we disconnect the connection, it will try to reconnect and |
| 1771 | do a health check as part of the send_command logic(on connection level). |
| 1772 | """ |
| 1773 | if ( |
| 1774 | error |
| 1775 | and failure_count is not None |
| 1776 | and failure_count <= conn.retry.get_retries() |
| 1777 | ): |
| 1778 | await record_operation_duration( |
| 1779 | command_name=command_name, |
| 1780 | duration_seconds=time.monotonic() - start_time, |
| 1781 | server_address=getattr(conn, "host", None), |
| 1782 | server_port=getattr(conn, "port", None), |
| 1783 | db_namespace=str(conn.db), |
| 1784 | error=error, |
| 1785 | retry_attempts=failure_count, |
| 1786 | ) |
| 1787 | await conn.disconnect(error=error, failure_count=failure_count) |
| 1788 | # if we were already watching a variable, the watch is no longer |
| 1789 | # valid since this connection has died. raise a WatchError, which |
| 1790 | # indicates the user should retry this transaction. |
| 1791 | if self.watching: |
| 1792 | await self.reset() |
| 1793 | raise WatchError( |
| 1794 | f"A {type(error).__name__} occurred while watching one or more keys" |
| 1795 | ) |
| 1796 | |
| 1797 | async def immediate_execute_command(self, *args, **options): |
| 1798 | """ |
no test coverage detected