Execute a raw command on the appropriate cluster node or target_nodes. It will retry the command as specified by the retries property of the :attr:`retry` & then raise an exception. :param args: | Raw command args :param kwargs: - t
(self, *args: EncodableT, **kwargs: Any)
| 950 | ) |
| 951 | |
| 952 | async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: |
| 953 | """ |
| 954 | Execute a raw command on the appropriate cluster node or target_nodes. |
| 955 | |
| 956 | It will retry the command as specified by the retries property of |
| 957 | the :attr:`retry` & then raise an exception. |
| 958 | |
| 959 | :param args: |
| 960 | | Raw command args |
| 961 | :param kwargs: |
| 962 | |
| 963 | - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode` |
| 964 | or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`] |
| 965 | - Rest of the kwargs are passed to the Redis connection |
| 966 | |
| 967 | :raises RedisClusterException: if target_nodes is not provided & the command |
| 968 | can't be mapped to a slot |
| 969 | """ |
| 970 | command = args[0] |
| 971 | target_nodes = [] |
| 972 | target_nodes_specified = False |
| 973 | retry_attempts = self.retry.get_retries() |
| 974 | |
| 975 | passed_targets = kwargs.pop("target_nodes", None) |
| 976 | if passed_targets and not self._is_node_flag(passed_targets): |
| 977 | target_nodes = self._parse_target_nodes(passed_targets) |
| 978 | target_nodes_specified = True |
| 979 | retry_attempts = 0 |
| 980 | |
| 981 | command_policies = await self._policy_resolver.resolve(args[0].lower()) |
| 982 | |
| 983 | if not command_policies and not target_nodes_specified: |
| 984 | command_flag = self.command_flags.get(command) |
| 985 | if not command_flag: |
| 986 | # Fallback to default policy |
| 987 | if not self.get_default_node(): |
| 988 | slot = None |
| 989 | else: |
| 990 | slot = await self._determine_slot(*args) |
| 991 | if slot is None: |
| 992 | command_policies = CommandPolicies() |
| 993 | else: |
| 994 | command_policies = CommandPolicies( |
| 995 | request_policy=RequestPolicy.DEFAULT_KEYED, |
| 996 | response_policy=ResponsePolicy.DEFAULT_KEYED, |
| 997 | ) |
| 998 | else: |
| 999 | if command_flag in self._command_flags_mapping: |
| 1000 | command_policies = CommandPolicies( |
| 1001 | request_policy=self._command_flags_mapping[command_flag] |
| 1002 | ) |
| 1003 | else: |
| 1004 | command_policies = CommandPolicies() |
| 1005 | elif not command_policies and target_nodes_specified: |
| 1006 | command_policies = CommandPolicies() |
| 1007 | |
| 1008 | # Add one for the first execution |
| 1009 | execute_attempts = 1 + retry_attempts |
nothing calls this directly
no test coverage detected