(self, error, failure_count)
| 2964 | return output |
| 2965 | |
| 2966 | async def _reinitialize_on_error(self, error, failure_count): |
| 2967 | if hasattr(error, "connection"): |
| 2968 | await record_error_count( |
| 2969 | server_address=error.connection.host, |
| 2970 | server_port=error.connection.port, |
| 2971 | network_peer_address=error.connection.host, |
| 2972 | network_peer_port=error.connection.port, |
| 2973 | error_type=error, |
| 2974 | retry_attempts=failure_count, |
| 2975 | is_internal=True, |
| 2976 | ) |
| 2977 | |
| 2978 | if self._watching: |
| 2979 | if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing: |
| 2980 | raise WatchError("Slot rebalancing occurred while watching keys") |
| 2981 | |
| 2982 | if ( |
| 2983 | type(error) in self.SLOT_REDIRECT_ERRORS |
| 2984 | or type(error) in self.CONNECTION_ERRORS |
| 2985 | ): |
| 2986 | if self._transaction_connection and self._transaction_node: |
| 2987 | # Disconnect and release back to pool |
| 2988 | await self._transaction_connection.disconnect() |
| 2989 | self._transaction_node.release(self._transaction_connection) |
| 2990 | self._transaction_connection = None |
| 2991 | |
| 2992 | self._pipe.cluster_client.reinitialize_counter += 1 |
| 2993 | if ( |
| 2994 | self._pipe.cluster_client.reinitialize_steps |
| 2995 | and self._pipe.cluster_client.reinitialize_counter |
| 2996 | % self._pipe.cluster_client.reinitialize_steps |
| 2997 | == 0 |
| 2998 | ): |
| 2999 | await self._pipe.cluster_client.nodes_manager.initialize() |
| 3000 | self.reinitialize_counter = 0 |
| 3001 | else: |
| 3002 | if isinstance(error, AskError): |
| 3003 | await self._pipe.cluster_client.nodes_manager.move_slot(error) |
| 3004 | |
| 3005 | self._executing = False |
| 3006 | |
| 3007 | async def _raise_first_error(self, responses, stack, start_time): |
| 3008 | """ |
no test coverage detected