(
self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
)
| 1096 | raise e |
| 1097 | |
| 1098 | async def _execute_command( |
| 1099 | self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any |
| 1100 | ) -> Any: |
| 1101 | asking = moved = False |
| 1102 | redirect_addr = None |
| 1103 | ttl = self.RedisClusterRequestTTL |
| 1104 | command = args[0] |
| 1105 | start_time = time.monotonic() |
| 1106 | |
| 1107 | while ttl > 0: |
| 1108 | ttl -= 1 |
| 1109 | try: |
| 1110 | if asking: |
| 1111 | target_node = self.get_node(node_name=redirect_addr) |
| 1112 | await target_node.execute_command("ASKING") |
| 1113 | asking = False |
| 1114 | elif moved: |
| 1115 | # MOVED occurred and the slots cache was updated, |
| 1116 | # refresh the target node |
| 1117 | slot = await self._determine_slot(*args) |
| 1118 | target_node = self.nodes_manager.get_node_from_slot( |
| 1119 | slot, |
| 1120 | self.read_from_replicas and args[0] in READ_COMMANDS, |
| 1121 | self.load_balancing_strategy |
| 1122 | if args[0] in READ_COMMANDS |
| 1123 | else None, |
| 1124 | ) |
| 1125 | moved = False |
| 1126 | |
| 1127 | response = await target_node.execute_command(*args, **kwargs) |
| 1128 | await self._record_command_metric( |
| 1129 | command_name=command, |
| 1130 | duration_seconds=time.monotonic() - start_time, |
| 1131 | connection=target_node, |
| 1132 | ) |
| 1133 | return response |
| 1134 | except BusyLoadingError as e: |
| 1135 | e.connection = target_node |
| 1136 | await self._record_command_metric( |
| 1137 | command_name=command, |
| 1138 | duration_seconds=time.monotonic() - start_time, |
| 1139 | connection=target_node, |
| 1140 | error=e, |
| 1141 | ) |
| 1142 | raise |
| 1143 | except MaxConnectionsError as e: |
| 1144 | # MaxConnectionsError indicates client-side resource exhaustion |
| 1145 | # (too many connections in the pool), not a node failure. |
| 1146 | # Don't treat this as a node failure - just re-raise the error |
| 1147 | # without reinitializing the cluster. |
| 1148 | e.connection = target_node |
| 1149 | await self._record_command_metric( |
| 1150 | command_name=command, |
| 1151 | duration_seconds=time.monotonic() - start_time, |
| 1152 | connection=target_node, |
| 1153 | error=e, |
| 1154 | ) |
| 1155 | raise |
no test coverage detected