MCPcopy
hub / github.com/redis/redis-py / ssubscribe

Method ssubscribe

redis/cluster.py:3066–3113  ·  view source on GitHub ↗

Subscribe to shard channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. ``Subscription`` objects can also be supplied positionally with an optional handler.

(
        self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
    )

Source from the content-addressed store, hash-verified

3064 return message
3065
3066 def ssubscribe(
3067 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
3068 ) -> None:
3069 """
3070 Subscribe to shard channels.
3071
3072 Channels supplied as keyword arguments expect a channel name as the key
3073 and a callable as the value. ``Subscription`` objects can also be
3074 supplied positionally with an optional handler.
3075 """
3076 s_channels = parse_pubsub_subscriptions(args, kwargs)
3077 # Serialize against reinitialize_shard_subscriptions (worker thread)
3078 # so the reverse index, shard_channels, and node_pubsub_mapping are
3079 # not mutated concurrently.
3080 with self._shard_state_lock:
3081 for s_channel, handler in s_channels.items():
3082 node = self.cluster.get_node_from_key(s_channel)
3083 if not node:
3084 continue
3085 # Lazy re-route: if this channel is already tracked against a
3086 # different node (e.g. after a slot migration), migrate it now
3087 # so the caller's intent is applied on the current owner.
3088 normalized_key = next(iter(self._normalize_keys({s_channel: None})))
3089 old_name = self._shard_channel_to_node.get(normalized_key)
3090 if old_name and old_name != node.name:
3091 # Match PubSub.ssubscribe() dict.update() semantics: the
3092 # caller's newly supplied handler (including None) always
3093 # overrides any previously registered handler.
3094 self._migrate_shard_channel(
3095 normalized_key,
3096 handler,
3097 old_name,
3098 node,
3099 )
3100 continue
3101 pubsub = self._get_node_pubsub(node)
3102 if handler:
3103 pubsub.ssubscribe(Subscription(s_channel, handler))
3104 else:
3105 pubsub.ssubscribe(s_channel)
3106 self.shard_channels.update(pubsub.shard_channels)
3107 self._shard_channel_to_node[normalized_key] = node.name
3108 self.pending_unsubscribe_shard_channels.difference_update(
3109 self._normalize_keys({s_channel: None})
3110 )
3111 if pubsub.subscribed and not self.subscribed:
3112 self.subscribed_event.set()
3113 self.health_check_response_counter = 0
3114
3115 def sunsubscribe(self, *args):
3116 if args:

Callers 1

Calls 9

_get_node_pubsubMethod · 0.95
SubscriptionClass · 0.90
updateMethod · 0.80
get_node_from_keyMethod · 0.45
_normalize_keysMethod · 0.45
getMethod · 0.45
setMethod · 0.45

Tested by

no test coverage detected