Execute a command and return a parsed response
(self, *args, **options)
| 816 | |
| 817 | # COMMAND EXECUTION AND PROTOCOL PARSING |
| 818 | async def execute_command(self, *args, **options): |
| 819 | """Execute a command and return a parsed response""" |
| 820 | await self.initialize() |
| 821 | pool = self.connection_pool |
| 822 | command_name = args[0] |
| 823 | conn = self.connection or await pool.get_connection() |
| 824 | |
| 825 | # Start timing for observability |
| 826 | start_time = time.monotonic() |
| 827 | # Track actual retry attempts for error reporting |
| 828 | actual_retry_attempts = 0 |
| 829 | |
| 830 | def failure_callback(error, failure_count): |
| 831 | nonlocal actual_retry_attempts |
| 832 | actual_retry_attempts = failure_count |
| 833 | return self._close_connection( |
| 834 | conn, error, failure_count, start_time, command_name |
| 835 | ) |
| 836 | |
| 837 | if self.single_connection_client: |
| 838 | await self._single_conn_lock.acquire() |
| 839 | try: |
| 840 | result = await conn.retry.call_with_retry( |
| 841 | lambda: self._send_command_parse_response( |
| 842 | conn, command_name, *args, **options |
| 843 | ), |
| 844 | failure_callback, |
| 845 | with_failure_count=True, |
| 846 | ) |
| 847 | |
| 848 | await record_operation_duration( |
| 849 | command_name=command_name, |
| 850 | duration_seconds=time.monotonic() - start_time, |
| 851 | server_address=getattr(conn, "host", None), |
| 852 | server_port=getattr(conn, "port", None), |
| 853 | db_namespace=str(conn.db), |
| 854 | ) |
| 855 | return result |
| 856 | except Exception as e: |
| 857 | await record_error_count( |
| 858 | server_address=getattr(conn, "host", None), |
| 859 | server_port=getattr(conn, "port", None), |
| 860 | network_peer_address=getattr(conn, "host", None), |
| 861 | network_peer_port=getattr(conn, "port", None), |
| 862 | error_type=e, |
| 863 | retry_attempts=actual_retry_attempts, |
| 864 | is_internal=False, |
| 865 | ) |
| 866 | raise |
| 867 | finally: |
| 868 | if self.single_connection_client: |
| 869 | self._single_conn_lock.release() |
| 870 | if not self.connection: |
| 871 | await pool.release(conn) |
| 872 | |
| 873 | async def parse_response( |
| 874 | self, connection: Connection, command_name: Union[str, bytes], **options |
no test coverage detected