Execute a command immediately, but don't auto-retry on the supported errors for retry if we're already WATCHing a variable. Used when issuing WATCH or subsequent commands retrieving their values but before MULTI is called.
(self, *args, **options)
| 1795 | ) |
| 1796 | |
| 1797 | async def immediate_execute_command(self, *args, **options): |
| 1798 | """ |
| 1799 | Execute a command immediately, but don't auto-retry on the supported |
| 1800 | errors for retry if we're already WATCHing a variable. |
| 1801 | Used when issuing WATCH or subsequent commands retrieving their values but before |
| 1802 | MULTI is called. |
| 1803 | """ |
| 1804 | command_name = args[0] |
| 1805 | conn = self.connection |
| 1806 | # if this is the first call, we need a connection |
| 1807 | if not conn: |
| 1808 | conn = await self.connection_pool.get_connection() |
| 1809 | self.connection = conn |
| 1810 | |
| 1811 | # Start timing for observability |
| 1812 | start_time = time.monotonic() |
| 1813 | # Track actual retry attempts for error reporting |
| 1814 | actual_retry_attempts = 0 |
| 1815 | |
| 1816 | def failure_callback(error, failure_count): |
| 1817 | nonlocal actual_retry_attempts |
| 1818 | actual_retry_attempts = failure_count |
| 1819 | return self._disconnect_reset_raise_on_watching( |
| 1820 | conn, error, failure_count, start_time, command_name |
| 1821 | ) |
| 1822 | |
| 1823 | try: |
| 1824 | response = await conn.retry.call_with_retry( |
| 1825 | lambda: self._send_command_parse_response( |
| 1826 | conn, command_name, *args, **options |
| 1827 | ), |
| 1828 | failure_callback, |
| 1829 | with_failure_count=True, |
| 1830 | ) |
| 1831 | |
| 1832 | await record_operation_duration( |
| 1833 | command_name=command_name, |
| 1834 | duration_seconds=time.monotonic() - start_time, |
| 1835 | server_address=getattr(conn, "host", None), |
| 1836 | server_port=getattr(conn, "port", None), |
| 1837 | db_namespace=str(conn.db), |
| 1838 | ) |
| 1839 | |
| 1840 | return response |
| 1841 | except Exception as e: |
| 1842 | await record_error_count( |
| 1843 | server_address=getattr(conn, "host", None), |
| 1844 | server_port=getattr(conn, "port", None), |
| 1845 | network_peer_address=getattr(conn, "host", None), |
| 1846 | network_peer_port=getattr(conn, "port", None), |
| 1847 | error_type=e, |
| 1848 | retry_attempts=actual_retry_attempts, |
| 1849 | is_internal=False, |
| 1850 | ) |
| 1851 | raise |
| 1852 | |
| 1853 | def pipeline_execute_command(self, *args, **options): |
| 1854 | """ |
no test coverage detected