| 2998 | yield from current_nodes |
| 2999 | |
| 3000 | def get_sharded_message( |
| 3001 | self, ignore_subscribe_messages=False, timeout=0.0, target_node=None |
| 3002 | ): |
| 3003 | if target_node: |
| 3004 | # Use .get(): migration-driven cleanup in the sunsubscribe branch |
| 3005 | # below and reset() both remove entries from node_pubsub_mapping, |
| 3006 | # so a caller polling with target_node may race the cleanup. Match |
| 3007 | # the async counterpart's None-handling rather than raising |
| 3008 | # KeyError. None pubsub falls through to "no message available". |
| 3009 | pubsub = self.node_pubsub_mapping.get(target_node.name) |
| 3010 | if pubsub is not None: |
| 3011 | # Don't pass ignore_subscribe_messages here - let get_sharded_message |
| 3012 | # handle the filtering after processing subscription state changes |
| 3013 | message = pubsub.get_message( |
| 3014 | ignore_subscribe_messages=False, timeout=timeout |
| 3015 | ) |
| 3016 | else: |
| 3017 | message = None |
| 3018 | else: |
| 3019 | pubsub, message = self._sharded_message_generator(timeout=timeout) |
| 3020 | if message is None: |
| 3021 | return None |
| 3022 | # Only sunsubscribe mutates cluster-level shard state; bypassing the |
| 3023 | # lock on the data-message hot path keeps smessage delivery from |
| 3024 | # competing with the reconciliation worker for _shard_state_lock. |
| 3025 | if str_if_bytes(message["type"]) == "sunsubscribe": |
| 3026 | # Serialize state mutation against reinitialize_shard_subscriptions |
| 3027 | # (worker thread). The blocking get_message above intentionally |
| 3028 | # runs outside the lock so reconciliation is not stalled by long |
| 3029 | # polls. |
| 3030 | with self._shard_state_lock: |
| 3031 | if message["channel"] in self.pending_unsubscribe_shard_channels: |
| 3032 | # User-initiated sunsubscribe: drop from cluster-level tracking. |
| 3033 | self.pending_unsubscribe_shard_channels.remove(message["channel"]) |
| 3034 | self.shard_channels.pop(message["channel"], None) |
| 3035 | self._shard_channel_to_node.pop(message["channel"], None) |
| 3036 | # Drop the per-node pubsub that delivered the confirmation once |
| 3037 | # it no longer holds any shard subscriptions, regardless of |
| 3038 | # whether the sunsubscribe was user-initiated or driven by |
| 3039 | # slot-migration reconciliation (_migrate_shard_channel, which |
| 3040 | # intentionally does not add the channel to |
| 3041 | # pending_unsubscribe_shard_channels). This releases the |
| 3042 | # dedicated connection that would otherwise linger. |
| 3043 | # Identifying the receiving pubsub directly (rather than via |
| 3044 | # the cluster's current slot map) is required after slot |
| 3045 | # migration, where the channel's owner is no longer the node |
| 3046 | # that received our original SSUBSCRIBE. |
| 3047 | if pubsub is not None and not pubsub.subscribed: |
| 3048 | name = self._find_node_name_for_pubsub(pubsub) |
| 3049 | if name is not None: |
| 3050 | try: |
| 3051 | pubsub.reset() |
| 3052 | except Exception: |
| 3053 | pass |
| 3054 | self.node_pubsub_mapping.pop(name, None) |
| 3055 | # Mirror PubSub.handle_message: the empty-check belongs in the |
| 3056 | # unsubscribe branch since that is the only path that can |
| 3057 | # reduce shard_channels here. |