Execute all the commands in the current pipeline
(self, raise_on_error: bool = True)
| 2037 | ) |
| 2038 | |
| 2039 | def execute(self, raise_on_error: bool = True) -> List[Any]: |
| 2040 | """Execute all the commands in the current pipeline""" |
| 2041 | stack = self.command_stack |
| 2042 | if not stack and not self.watching: |
| 2043 | return [] |
| 2044 | if self.scripts: |
| 2045 | self.load_scripts() |
| 2046 | if self.transaction or self.explicit_transaction: |
| 2047 | execute = self._execute_transaction |
| 2048 | operation_name = "MULTI" |
| 2049 | else: |
| 2050 | execute = self._execute_pipeline |
| 2051 | operation_name = "PIPELINE" |
| 2052 | |
| 2053 | conn = self.connection |
| 2054 | if not conn: |
| 2055 | conn = self.connection_pool.get_connection() |
| 2056 | # assign to self.connection so reset() releases the connection |
| 2057 | # back to the pool after we're done |
| 2058 | self.connection = conn |
| 2059 | |
| 2060 | # Start timing for observability |
| 2061 | start_time = time.monotonic() |
| 2062 | # Track actual retry attempts for error reporting |
| 2063 | actual_retry_attempts = [0] |
| 2064 | |
| 2065 | def failure_callback(error, failure_count): |
| 2066 | if is_debug_log_enabled(): |
| 2067 | add_debug_log_for_operation_failure(conn) |
| 2068 | actual_retry_attempts[0] = failure_count |
| 2069 | self._disconnect_raise_on_watching( |
| 2070 | conn, error, failure_count, start_time, operation_name |
| 2071 | ) |
| 2072 | |
| 2073 | try: |
| 2074 | response = conn.retry.call_with_retry( |
| 2075 | lambda: execute(conn, stack, raise_on_error), |
| 2076 | failure_callback, |
| 2077 | with_failure_count=True, |
| 2078 | ) |
| 2079 | |
| 2080 | record_operation_duration( |
| 2081 | command_name=operation_name, |
| 2082 | duration_seconds=time.monotonic() - start_time, |
| 2083 | server_address=getattr(conn, "host", None), |
| 2084 | server_port=getattr(conn, "port", None), |
| 2085 | db_namespace=str(conn.db), |
| 2086 | ) |
| 2087 | return response |
| 2088 | except Exception as e: |
| 2089 | record_error_count( |
| 2090 | server_address=getattr(conn, "host", None), |
| 2091 | server_port=getattr(conn, "port", None), |
| 2092 | network_peer_address=getattr(conn, "host", None), |
| 2093 | network_peer_port=getattr(conn, "port", None), |
| 2094 | error_type=e, |
| 2095 | retry_attempts=actual_retry_attempts[0], |
| 2096 | is_internal=False, |