Execute all the commands in the current pipeline
(self, raise_on_error: bool = True)
| 2043 | ) |
| 2044 | |
| 2045 | async def execute(self, raise_on_error: bool = True) -> List[Any]: |
| 2046 | """Execute all the commands in the current pipeline""" |
| 2047 | stack = self.command_stack |
| 2048 | if not stack and not self.watching: |
| 2049 | return [] |
| 2050 | if self.scripts: |
| 2051 | await self.load_scripts() |
| 2052 | if self.is_transaction or self.explicit_transaction: |
| 2053 | execute = self._execute_transaction |
| 2054 | operation_name = "MULTI" |
| 2055 | else: |
| 2056 | execute = self._execute_pipeline |
| 2057 | operation_name = "PIPELINE" |
| 2058 | |
| 2059 | conn = self.connection |
| 2060 | if not conn: |
| 2061 | conn = await self.connection_pool.get_connection() |
| 2062 | # assign to self.connection so reset() releases the connection |
| 2063 | # back to the pool after we're done |
| 2064 | self.connection = conn |
| 2065 | conn = cast(Connection, conn) |
| 2066 | |
| 2067 | # Start timing for observability |
| 2068 | start_time = time.monotonic() |
| 2069 | # Track actual retry attempts for error reporting |
| 2070 | actual_retry_attempts = 0 |
| 2071 | |
| 2072 | def failure_callback(error, failure_count): |
| 2073 | nonlocal actual_retry_attempts |
| 2074 | actual_retry_attempts = failure_count |
| 2075 | return self._disconnect_raise_on_watching( |
| 2076 | conn, error, failure_count, start_time, operation_name |
| 2077 | ) |
| 2078 | |
| 2079 | try: |
| 2080 | response = await conn.retry.call_with_retry( |
| 2081 | lambda: execute(conn, stack, raise_on_error), |
| 2082 | failure_callback, |
| 2083 | with_failure_count=True, |
| 2084 | ) |
| 2085 | |
| 2086 | await record_operation_duration( |
| 2087 | command_name=operation_name, |
| 2088 | duration_seconds=time.monotonic() - start_time, |
| 2089 | server_address=getattr(conn, "host", None), |
| 2090 | server_port=getattr(conn, "port", None), |
| 2091 | db_namespace=str(conn.db), |
| 2092 | ) |
| 2093 | return response |
| 2094 | except Exception as e: |
| 2095 | await record_error_count( |
| 2096 | server_address=getattr(conn, "host", None), |
| 2097 | server_port=getattr(conn, "port", None), |
| 2098 | network_peer_address=getattr(conn, "host", None), |
| 2099 | network_peer_port=getattr(conn, "port", None), |
| 2100 | error_type=e, |
| 2101 | retry_attempts=actual_retry_attempts, |
| 2102 | is_internal=False, |
no test coverage detected