Send a bunch of cluster commands to the redis cluster. `allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses automatically. If set to false it will raise RedisClusterException.
(
self, stack, raise_on_error=True, allow_redirections=True
)
| 4060 | raise e |
| 4061 | |
| 4062 | def _send_cluster_commands( |
| 4063 | self, stack, raise_on_error=True, allow_redirections=True |
| 4064 | ): |
| 4065 | """ |
| 4066 | Send a bunch of cluster commands to the redis cluster. |
| 4067 | |
| 4068 | `allow_redirections` If the pipeline should follow |
| 4069 | `ASK` & `MOVED` responses automatically. If set |
| 4070 | to false it will raise RedisClusterException. |
| 4071 | """ |
| 4072 | # the first time sending the commands we send all of |
| 4073 | # the commands that were queued up. |
| 4074 | # if we have to run through it again, we only retry |
| 4075 | # the commands that failed. |
| 4076 | attempt = sorted(stack, key=lambda x: x.position) |
| 4077 | is_default_node = False |
| 4078 | # build a list of node objects based on node names we need to |
| 4079 | nodes: dict[str, NodeCommands] = {} |
| 4080 | nodes_written = 0 |
| 4081 | nodes_read = 0 |
| 4082 | |
| 4083 | try: |
| 4084 | # as we move through each command that still needs to be processed, |
| 4085 | # we figure out the slot number that command maps to, then from |
| 4086 | # the slot determine the node. |
| 4087 | for c in attempt: |
| 4088 | command_policies = self._pipe._policy_resolver.resolve( |
| 4089 | c.args[0].lower() |
| 4090 | ) |
| 4091 | # refer to our internal node -> slot table that |
| 4092 | # tells us where a given command should route to. |
| 4093 | # (it might be possible we have a cached node that no longer |
| 4094 | # exists in the cluster, which is why we do this in a loop) |
| 4095 | passed_targets = c.options.pop("target_nodes", None) |
| 4096 | if passed_targets and not self._is_nodes_flag(passed_targets): |
| 4097 | target_nodes = self._parse_target_nodes(passed_targets) |
| 4098 | |
| 4099 | if not command_policies: |
| 4100 | command_policies = CommandPolicies() |
| 4101 | else: |
| 4102 | if not command_policies: |
| 4103 | command = c.args[0].upper() |
| 4104 | if ( |
| 4105 | len(c.args) >= 2 |
| 4106 | and f"{c.args[0]} {c.args[1]}".upper() |
| 4107 | in self._pipe.command_flags |
| 4108 | ): |
| 4109 | command = f"{c.args[0]} {c.args[1]}".upper() |
| 4110 | |
| 4111 | # We only could resolve key properties if command is not |
| 4112 | # in a list of pre-defined request policies |
| 4113 | command_flag = self.command_flags.get(command) |
| 4114 | if not command_flag: |
| 4115 | # Fallback to default policy |
| 4116 | if not self._pipe.get_default_node(): |
| 4117 | keys = None |
| 4118 | else: |
| 4119 | keys = self._pipe._get_command_keys(*c.args) |
no test coverage detected