Execute a subscribe/unsubscribe command. Taken code from redis-py and tweak to make it work within a cluster.
(self, *args)
| 2903 | ) |
| 2904 | |
| 2905 | def execute_command(self, *args): |
| 2906 | """ |
| 2907 | Execute a subscribe/unsubscribe command. |
| 2908 | |
| 2909 | Taken code from redis-py and tweak to make it work within a cluster. |
| 2910 | """ |
| 2911 | # NOTE: don't parse the response in this function -- it could pull a |
| 2912 | # legitimate message off the stack if the connection is already |
| 2913 | # subscribed to one or more channels |
| 2914 | |
| 2915 | if self.connection is None: |
| 2916 | if self.connection_pool is None: |
| 2917 | if len(args) > 1: |
| 2918 | # Hash the first channel and get one of the nodes holding |
| 2919 | # this slot |
| 2920 | channel = args[1] |
| 2921 | slot = self.cluster.keyslot(channel) |
| 2922 | node = self.cluster.nodes_manager.get_node_from_slot( |
| 2923 | slot, |
| 2924 | self.cluster.read_from_replicas, |
| 2925 | self.cluster.load_balancing_strategy, |
| 2926 | ) |
| 2927 | else: |
| 2928 | # Get a random node |
| 2929 | node = self.cluster.get_random_node() |
| 2930 | self.node = node |
| 2931 | redis_connection = self.cluster.get_redis_connection(node) |
| 2932 | self.connection_pool = redis_connection.connection_pool |
| 2933 | self.connection = self.connection_pool.get_connection() |
| 2934 | # register a callback that re-subscribes to any channels we |
| 2935 | # were listening to when we were disconnected |
| 2936 | self.connection.register_connect_callback(self.on_connect) |
| 2937 | if self.push_handler_func is not None: |
| 2938 | self.connection._parser.set_pubsub_push_handler(self.push_handler_func) |
| 2939 | self._event_dispatcher.dispatch( |
| 2940 | AfterPubSubConnectionInstantiationEvent( |
| 2941 | self.connection, self.connection_pool, ClientType.SYNC, self._lock |
| 2942 | ) |
| 2943 | ) |
| 2944 | connection = self.connection |
| 2945 | self._execute(connection, connection.send_command, *args) |
| 2946 | |
| 2947 | def _resubscribe_shard_channels(self) -> None: |
| 2948 | # A single node can own multiple slot ranges, so a batched |
nothing calls this directly
no test coverage detected