| 2956 | self._resubscribe(subscriptions, self.ssubscribe) |
| 2957 | |
| 2958 | def _get_node_pubsub(self, node): |
| 2959 | try: |
| 2960 | return self.node_pubsub_mapping[node.name] |
| 2961 | except KeyError: |
| 2962 | redis_connection = self.cluster.get_redis_connection(node) |
| 2963 | pubsub = redis_connection.pubsub( |
| 2964 | push_handler_func=self.push_handler_func, |
| 2965 | ) |
| 2966 | # Replay shard subscriptions on reconnect with slot-aware grouping |
| 2967 | # so that channels spanning multiple slots owned by this node do |
| 2968 | # not trigger a CROSSSLOT error. |
| 2969 | pubsub._resubscribe_shard_channels = MethodType( |
| 2970 | ClusterPubSub._resubscribe_shard_channels, pubsub |
| 2971 | ) |
| 2972 | self.node_pubsub_mapping[node.name] = pubsub |
| 2973 | return pubsub |
| 2974 | |
| 2975 | def _find_node_name_for_pubsub(self, pubsub): |
| 2976 | for node_name, node_pubsub in self.node_pubsub_mapping.items(): |