Gets a node that servers this hash slot
(
self,
slot: int,
read_from_replicas: bool = False,
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
server_type: Optional[Literal["primary", "replica"]] = None,
)
| 2258 | version="5.3.0", |
| 2259 | ) |
| 2260 | def get_node_from_slot( |
| 2261 | self, |
| 2262 | slot: int, |
| 2263 | read_from_replicas: bool = False, |
| 2264 | load_balancing_strategy: Optional[LoadBalancingStrategy] = None, |
| 2265 | server_type: Optional[Literal["primary", "replica"]] = None, |
| 2266 | ) -> ClusterNode: |
| 2267 | """ |
| 2268 | Gets a node that servers this hash slot |
| 2269 | """ |
| 2270 | |
| 2271 | if read_from_replicas is True and load_balancing_strategy is None: |
| 2272 | load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN |
| 2273 | |
| 2274 | with self._lock: |
| 2275 | if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0: |
| 2276 | raise SlotNotCoveredError( |
| 2277 | f'Slot "{slot}" not covered by the cluster. ' |
| 2278 | + f'"require_full_coverage={self._require_full_coverage}"' |
| 2279 | ) |
| 2280 | |
| 2281 | if len(self.slots_cache[slot]) > 1 and load_balancing_strategy: |
| 2282 | # get the server index using the strategy defined in load_balancing_strategy |
| 2283 | primary_name = self.slots_cache[slot][0].name |
| 2284 | node_idx = self.read_load_balancer.get_server_index( |
| 2285 | primary_name, len(self.slots_cache[slot]), load_balancing_strategy |
| 2286 | ) |
| 2287 | elif ( |
| 2288 | server_type is None |
| 2289 | or server_type == PRIMARY |
| 2290 | or len(self.slots_cache[slot]) == 1 |
| 2291 | ): |
| 2292 | # return a primary |
| 2293 | node_idx = 0 |
| 2294 | else: |
| 2295 | # return a replica |
| 2296 | # randomly choose one of the replicas |
| 2297 | node_idx = random.randint(1, len(self.slots_cache[slot]) - 1) |
| 2298 | |
| 2299 | return self.slots_cache[slot][node_idx] |
| 2300 | |
| 2301 | def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]): |
| 2302 | """ |