MCPcopy
hub / github.com/redis/redis-py / ssubscribe

Method ssubscribe

redis/asyncio/cluster.py:3631–3676  ·  view source on GitHub ↗

Subscribe to shard channels. :param args: Channel names or ``Subscription`` objects :param kwargs: Channel names with handlers

(
        self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
    )

Source from the content-addressed store, hash-verified

3629 return message
3630
3631 async def ssubscribe(
3632 self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
3633 ) -> None:
3634 """
3635 Subscribe to shard channels.
3636
3637 :param args: Channel names or ``Subscription`` objects
3638 :param kwargs: Channel names with handlers
3639 """
3640 s_channels = parse_pubsub_subscriptions(args, kwargs)
3641
3642 # Serialize against reinitialize_shard_subscriptions (background
3643 # task) so the reverse index, shard_channels, and node_pubsub_mapping
3644 # are not mutated concurrently. _migrate_shard_channel below does not
3645 # re-acquire this lock (asyncio.Lock is non-reentrant).
3646 async with self._shard_state_lock:
3647 for s_channel, handler in s_channels.items():
3648 node = self.cluster.get_node_from_key(s_channel)
3649 if not node:
3650 continue
3651 # Lazy re-route: if this channel is already tracked against a
3652 # different node (e.g. after a slot migration), migrate it now
3653 # so the caller's intent is applied on the current owner.
3654 normalized_key = next(iter(self._normalize_keys({s_channel: None})))
3655 old_name = self._shard_channel_to_node.get(normalized_key)
3656 if old_name and old_name != node.name:
3657 # Match PubSub.ssubscribe() dict.update() semantics: the
3658 # caller's newly supplied handler (including None) always
3659 # overrides any previously registered handler.
3660 await self._migrate_shard_channel(
3661 normalized_key,
3662 handler,
3663 old_name,
3664 node,
3665 )
3666 continue
3667 pubsub = self._get_node_pubsub(node)
3668 if handler:
3669 await pubsub.ssubscribe(Subscription(s_channel, handler))
3670 else:
3671 await pubsub.ssubscribe(s_channel)
3672 self.shard_channels.update(pubsub.shard_channels)
3673 self._shard_channel_to_node[normalized_key] = node.name
3674 self.pending_unsubscribe_shard_channels.difference_update(
3675 self._normalize_keys({s_channel: None})
3676 )
3677
3678 async def sunsubscribe(self, *args: Any) -> None:
3679 """

Callers 1

Calls 8

_get_node_pubsubMethod · 0.95
SubscriptionClass · 0.90
updateMethod · 0.80
get_node_from_keyMethod · 0.45
_normalize_keysMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected