Subscribe to shard channels. :param args: Channel names or ``Subscription`` objects :param kwargs: Channel names with handlers
(
self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
)
| 3629 | return message |
| 3630 | |
| 3631 | async def ssubscribe( |
| 3632 | self, *args: ChannelT | Subscription, **kwargs: PubSubHandler |
| 3633 | ) -> None: |
| 3634 | """ |
| 3635 | Subscribe to shard channels. |
| 3636 | |
| 3637 | :param args: Channel names or ``Subscription`` objects |
| 3638 | :param kwargs: Channel names with handlers |
| 3639 | """ |
| 3640 | s_channels = parse_pubsub_subscriptions(args, kwargs) |
| 3641 | |
| 3642 | # Serialize against reinitialize_shard_subscriptions (background |
| 3643 | # task) so the reverse index, shard_channels, and node_pubsub_mapping |
| 3644 | # are not mutated concurrently. _migrate_shard_channel below does not |
| 3645 | # re-acquire this lock (asyncio.Lock is non-reentrant). |
| 3646 | async with self._shard_state_lock: |
| 3647 | for s_channel, handler in s_channels.items(): |
| 3648 | node = self.cluster.get_node_from_key(s_channel) |
| 3649 | if not node: |
| 3650 | continue |
| 3651 | # Lazy re-route: if this channel is already tracked against a |
| 3652 | # different node (e.g. after a slot migration), migrate it now |
| 3653 | # so the caller's intent is applied on the current owner. |
| 3654 | normalized_key = next(iter(self._normalize_keys({s_channel: None}))) |
| 3655 | old_name = self._shard_channel_to_node.get(normalized_key) |
| 3656 | if old_name and old_name != node.name: |
| 3657 | # Match PubSub.ssubscribe() dict.update() semantics: the |
| 3658 | # caller's newly supplied handler (including None) always |
| 3659 | # overrides any previously registered handler. |
| 3660 | await self._migrate_shard_channel( |
| 3661 | normalized_key, |
| 3662 | handler, |
| 3663 | old_name, |
| 3664 | node, |
| 3665 | ) |
| 3666 | continue |
| 3667 | pubsub = self._get_node_pubsub(node) |
| 3668 | if handler: |
| 3669 | await pubsub.ssubscribe(Subscription(s_channel, handler)) |
| 3670 | else: |
| 3671 | await pubsub.ssubscribe(s_channel) |
| 3672 | self.shard_channels.update(pubsub.shard_channels) |
| 3673 | self._shard_channel_to_node[normalized_key] = node.name |
| 3674 | self.pending_unsubscribe_shard_channels.difference_update( |
| 3675 | self._normalize_keys({s_channel: None}) |
| 3676 | ) |
| 3677 | |
| 3678 | async def sunsubscribe(self, *args: Any) -> None: |
| 3679 | """ |
no test coverage detected