Send a command to a node in the cluster
(self, target_node, *args, **kwargs)
| 1628 | raise e |
| 1629 | |
| 1630 | def _execute_command(self, target_node, *args, **kwargs): |
| 1631 | """ |
| 1632 | Send a command to a node in the cluster |
| 1633 | """ |
| 1634 | command = args[0] |
| 1635 | redis_node = None |
| 1636 | connection = None |
| 1637 | redirect_addr = None |
| 1638 | asking = False |
| 1639 | moved = False |
| 1640 | ttl = int(self.RedisClusterRequestTTL) |
| 1641 | |
| 1642 | # Start timing for observability |
| 1643 | start_time = time.monotonic() |
| 1644 | |
| 1645 | while ttl > 0: |
| 1646 | ttl -= 1 |
| 1647 | try: |
| 1648 | if asking: |
| 1649 | target_node = self.get_node(node_name=redirect_addr) |
| 1650 | elif moved: |
| 1651 | # MOVED occurred and the slots cache was updated, |
| 1652 | # refresh the target node |
| 1653 | slot = self.determine_slot(*args) |
| 1654 | target_node = self.nodes_manager.get_node_from_slot( |
| 1655 | slot, |
| 1656 | self.read_from_replicas and command in READ_COMMANDS, |
| 1657 | self.load_balancing_strategy |
| 1658 | if command in READ_COMMANDS |
| 1659 | else None, |
| 1660 | ) |
| 1661 | moved = False |
| 1662 | |
| 1663 | redis_node = self.get_redis_connection(target_node) |
| 1664 | connection = get_connection(redis_node) |
| 1665 | if asking: |
| 1666 | connection.send_command("ASKING") |
| 1667 | redis_node.parse_response(connection, "ASKING", **kwargs) |
| 1668 | asking = False |
| 1669 | connection.send_command(*args, **kwargs) |
| 1670 | response = redis_node.parse_response(connection, command, **kwargs) |
| 1671 | |
| 1672 | # Remove keys entry, it needs only for cache. |
| 1673 | kwargs.pop("keys", None) |
| 1674 | |
| 1675 | if command in self.cluster_response_callbacks: |
| 1676 | response = self.cluster_response_callbacks[command]( |
| 1677 | response, **kwargs |
| 1678 | ) |
| 1679 | |
| 1680 | self._record_command_metric( |
| 1681 | command_name=command, |
| 1682 | duration_seconds=time.monotonic() - start_time, |
| 1683 | connection=connection, |
| 1684 | ) |
| 1685 | return response |
| 1686 | except AuthenticationError as e: |
| 1687 | e.connection = connection if connection is not None else target_node |
no test coverage detected