MCPcopy
hub / github.com/redis/redis-py / _send_cluster_commands

Method _send_cluster_commands

redis/cluster.py:4062–4302  ·  view source on GitHub ↗

Send a bunch of cluster commands to the redis cluster. `allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses automatically. If set to false it will raise RedisClusterException.

(
        self, stack, raise_on_error=True, allow_redirections=True
    )

Source from the content-addressed store, hash-verified

4060 raise e
4061
4062 def _send_cluster_commands(
4063 self, stack, raise_on_error=True, allow_redirections=True
4064 ):
4065 """
4066 Send a bunch of cluster commands to the redis cluster.
4067
4068 `allow_redirections` If the pipeline should follow
4069 `ASK` & `MOVED` responses automatically. If set
4070 to false it will raise RedisClusterException.
4071 """
4072 # the first time sending the commands we send all of
4073 # the commands that were queued up.
4074 # if we have to run through it again, we only retry
4075 # the commands that failed.
4076 attempt = sorted(stack, key=lambda x: x.position)
4077 is_default_node = False
4078 # build a list of node objects based on node names we need to
4079 nodes: dict[str, NodeCommands] = {}
4080 nodes_written = 0
4081 nodes_read = 0
4082
4083 try:
4084 # as we move through each command that still needs to be processed,
4085 # we figure out the slot number that command maps to, then from
4086 # the slot determine the node.
4087 for c in attempt:
4088 command_policies = self._pipe._policy_resolver.resolve(
4089 c.args[0].lower()
4090 )
4091 # refer to our internal node -> slot table that
4092 # tells us where a given command should route to.
4093 # (it might be possible we have a cached node that no longer
4094 # exists in the cluster, which is why we do this in a loop)
4095 passed_targets = c.options.pop("target_nodes", None)
4096 if passed_targets and not self._is_nodes_flag(passed_targets):
4097 target_nodes = self._parse_target_nodes(passed_targets)
4098
4099 if not command_policies:
4100 command_policies = CommandPolicies()
4101 else:
4102 if not command_policies:
4103 command = c.args[0].upper()
4104 if (
4105 len(c.args) >= 2
4106 and f"{c.args[0]} {c.args[1]}".upper()
4107 in self._pipe.command_flags
4108 ):
4109 command = f"{c.args[0]} {c.args[1]}".upper()
4110
4111 # We only could resolve key properties if command is not
4112 # in a list of pre-defined request policies
4113 command_flag = self.command_flags.get(command)
4114 if not command_flag:
4115 # Fallback to default policy
4116 if not self._pipe.get_default_node():
4117 keys = None
4118 else:
4119 keys = self._pipe._get_command_keys(*c.args)

Callers 1

send_cluster_commandsMethod · 0.95

Calls 15

_is_nodes_flagMethod · 0.95
_parse_target_nodesMethod · 0.95
_determine_nodesMethod · 0.95
_raise_first_errorMethod · 0.95
CommandPoliciesClass · 0.90
get_connectionFunction · 0.85
NodeCommandsClass · 0.85
_get_command_keysMethod · 0.80
replace_default_nodeMethod · 0.80
monotonicMethod · 0.80

Tested by

no test coverage detected