Subscribe to shard channels. Channels supplied as keyword arguments expect a channel name as the key and a callable as the value. ``Subscription`` objects can also be supplied positionally with an optional handler.
(
self, *args: ChannelT | Subscription, **kwargs: PubSubHandler
)
| 3064 | return message |
| 3065 | |
| 3066 | def ssubscribe( |
| 3067 | self, *args: ChannelT | Subscription, **kwargs: PubSubHandler |
| 3068 | ) -> None: |
| 3069 | """ |
| 3070 | Subscribe to shard channels. |
| 3071 | |
| 3072 | Channels supplied as keyword arguments expect a channel name as the key |
| 3073 | and a callable as the value. ``Subscription`` objects can also be |
| 3074 | supplied positionally with an optional handler. |
| 3075 | """ |
| 3076 | s_channels = parse_pubsub_subscriptions(args, kwargs) |
| 3077 | # Serialize against reinitialize_shard_subscriptions (worker thread) |
| 3078 | # so the reverse index, shard_channels, and node_pubsub_mapping are |
| 3079 | # not mutated concurrently. |
| 3080 | with self._shard_state_lock: |
| 3081 | for s_channel, handler in s_channels.items(): |
| 3082 | node = self.cluster.get_node_from_key(s_channel) |
| 3083 | if not node: |
| 3084 | continue |
| 3085 | # Lazy re-route: if this channel is already tracked against a |
| 3086 | # different node (e.g. after a slot migration), migrate it now |
| 3087 | # so the caller's intent is applied on the current owner. |
| 3088 | normalized_key = next(iter(self._normalize_keys({s_channel: None}))) |
| 3089 | old_name = self._shard_channel_to_node.get(normalized_key) |
| 3090 | if old_name and old_name != node.name: |
| 3091 | # Match PubSub.ssubscribe() dict.update() semantics: the |
| 3092 | # caller's newly supplied handler (including None) always |
| 3093 | # overrides any previously registered handler. |
| 3094 | self._migrate_shard_channel( |
| 3095 | normalized_key, |
| 3096 | handler, |
| 3097 | old_name, |
| 3098 | node, |
| 3099 | ) |
| 3100 | continue |
| 3101 | pubsub = self._get_node_pubsub(node) |
| 3102 | if handler: |
| 3103 | pubsub.ssubscribe(Subscription(s_channel, handler)) |
| 3104 | else: |
| 3105 | pubsub.ssubscribe(s_channel) |
| 3106 | self.shard_channels.update(pubsub.shard_channels) |
| 3107 | self._shard_channel_to_node[normalized_key] = node.name |
| 3108 | self.pending_unsubscribe_shard_channels.difference_update( |
| 3109 | self._normalize_keys({s_channel: None}) |
| 3110 | ) |
| 3111 | if pubsub.subscribed and not self.subscribed: |
| 3112 | self.subscribed_event.set() |
| 3113 | self.health_check_response_counter = 0 |
| 3114 | |
| 3115 | def sunsubscribe(self, *args): |
| 3116 | if args: |
no test coverage detected