MCPcopy
hub / github.com/redis/redis-py / get_sharded_message

Method get_sharded_message

redis/cluster.py:3000–3064  ·  view source on GitHub ↗
(
        self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
    )

Source from the content-addressed store, hash-verified

2998 yield from current_nodes
2999
3000 def get_sharded_message(
3001 self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
3002 ):
3003 if target_node:
3004 # Use .get(): migration-driven cleanup in the sunsubscribe branch
3005 # below and reset() both remove entries from node_pubsub_mapping,
3006 # so a caller polling with target_node may race the cleanup. Match
3007 # the async counterpart's None-handling rather than raising
3008 # KeyError. None pubsub falls through to "no message available".
3009 pubsub = self.node_pubsub_mapping.get(target_node.name)
3010 if pubsub is not None:
3011 # Don't pass ignore_subscribe_messages here - let get_sharded_message
3012 # handle the filtering after processing subscription state changes
3013 message = pubsub.get_message(
3014 ignore_subscribe_messages=False, timeout=timeout
3015 )
3016 else:
3017 message = None
3018 else:
3019 pubsub, message = self._sharded_message_generator(timeout=timeout)
3020 if message is None:
3021 return None
3022 # Only sunsubscribe mutates cluster-level shard state; bypassing the
3023 # lock on the data-message hot path keeps smessage delivery from
3024 # competing with the reconciliation worker for _shard_state_lock.
3025 if str_if_bytes(message["type"]) == "sunsubscribe":
3026 # Serialize state mutation against reinitialize_shard_subscriptions
3027 # (worker thread). The blocking get_message above intentionally
3028 # runs outside the lock so reconciliation is not stalled by long
3029 # polls.
3030 with self._shard_state_lock:
3031 if message["channel"] in self.pending_unsubscribe_shard_channels:
3032 # User-initiated sunsubscribe: drop from cluster-level tracking.
3033 self.pending_unsubscribe_shard_channels.remove(message["channel"])
3034 self.shard_channels.pop(message["channel"], None)
3035 self._shard_channel_to_node.pop(message["channel"], None)
3036 # Drop the per-node pubsub that delivered the confirmation once
3037 # it no longer holds any shard subscriptions, regardless of
3038 # whether the sunsubscribe was user-initiated or driven by
3039 # slot-migration reconciliation (_migrate_shard_channel, which
3040 # intentionally does not add the channel to
3041 # pending_unsubscribe_shard_channels). This releases the
3042 # dedicated connection that would otherwise linger.
3043 # Identifying the receiving pubsub directly (rather than via
3044 # the cluster's current slot map) is required after slot
3045 # migration, where the channel's owner is no longer the node
3046 # that received our original SSUBSCRIBE.
3047 if pubsub is not None and not pubsub.subscribed:
3048 name = self._find_node_name_for_pubsub(pubsub)
3049 if name is not None:
3050 try:
3051 pubsub.reset()
3052 except Exception:
3053 pass
3054 self.node_pubsub_mapping.pop(name, None)
3055 # Mirror PubSub.handle_message: the empty-check belongs in the
3056 # unsubscribe branch since that is the only path that can
3057 # reduce shard_channels here.

Calls 8

str_if_bytesFunction · 0.90
removeMethod · 0.80
getMethod · 0.45
get_messageMethod · 0.45
resetMethod · 0.45
clearMethod · 0.45