Execute a command and return a parsed response
(self, *args, **options)
| 798 | return self._execute_command(*args, **options) |
| 799 | |
| 800 | def _execute_command(self, *args, **options): |
| 801 | """Execute a command and return a parsed response""" |
| 802 | pool = self.connection_pool |
| 803 | command_name = args[0] |
| 804 | conn = self.connection or pool.get_connection() |
| 805 | |
| 806 | # Start timing for observability |
| 807 | start_time = time.monotonic() |
| 808 | # Track actual retry attempts for error reporting |
| 809 | actual_retry_attempts = [0] |
| 810 | |
| 811 | def failure_callback(error, failure_count): |
| 812 | if is_debug_log_enabled(): |
| 813 | add_debug_log_for_operation_failure(conn) |
| 814 | actual_retry_attempts[0] = failure_count |
| 815 | self._close_connection(conn, error, failure_count, start_time, command_name) |
| 816 | |
| 817 | if self._single_connection_client: |
| 818 | self.single_connection_lock.acquire() |
| 819 | try: |
| 820 | result = conn.retry.call_with_retry( |
| 821 | lambda: self._send_command_parse_response( |
| 822 | conn, command_name, *args, **options |
| 823 | ), |
| 824 | failure_callback, |
| 825 | with_failure_count=True, |
| 826 | ) |
| 827 | |
| 828 | record_operation_duration( |
| 829 | command_name=command_name, |
| 830 | duration_seconds=time.monotonic() - start_time, |
| 831 | server_address=getattr(conn, "host", None), |
| 832 | server_port=getattr(conn, "port", None), |
| 833 | db_namespace=str(conn.db), |
| 834 | ) |
| 835 | return result |
| 836 | except Exception as e: |
| 837 | record_error_count( |
| 838 | server_address=getattr(conn, "host", None), |
| 839 | server_port=getattr(conn, "port", None), |
| 840 | network_peer_address=getattr(conn, "host", None), |
| 841 | network_peer_port=getattr(conn, "port", None), |
| 842 | error_type=e, |
| 843 | retry_attempts=actual_retry_attempts[0], |
| 844 | is_internal=False, |
| 845 | ) |
| 846 | raise |
| 847 | |
| 848 | finally: |
| 849 | if conn and conn.should_reconnect(): |
| 850 | self._close_connection(conn) |
| 851 | conn.connect() |
| 852 | if self._single_connection_client: |
| 853 | self.single_connection_lock.release() |
| 854 | if not self.connection: |
| 855 | pool.release(conn) |
| 856 | |
| 857 | def parse_response(self, connection, command_name, **options): |