MCPcopy
hub / github.com/redis/redis-py / reinitialize_shard_subscriptions

Method reinitialize_shard_subscriptions

redis/cluster.py:3143–3214  ·  view source on GitHub ↗

Reconcile per-node shard subscriptions against the cluster's current slot ownership map. For each tracked shard channel whose owning node has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on the old node's pubsub and ssubscribe on the new owner's pubs

(self)

Source from the content-addressed store, hash-verified

3141 )
3142
3143 def reinitialize_shard_subscriptions(self):
3144 """
3145 Reconcile per-node shard subscriptions against the cluster's current
3146 slot ownership map. For each tracked shard channel whose owning node
3147 has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on
3148 the old node's pubsub and ssubscribe on the new owner's pubsub,
3149 preserving any registered handler.
3150 """
3151 uncovered: list = []
3152 made_progress = False
3153 first_migrate_error: Optional[BaseException] = None
3154 with self._shard_state_lock:
3155 for channel, handler in list(self.shard_channels.items()):
3156 try:
3157 new_node = self.cluster.get_node_from_key(channel)
3158 except SlotNotCoveredError:
3159 # Slot is transiently uncovered (mid-migration / partial
3160 # topology refresh). Defer this channel so coverable
3161 # siblings still reconcile this pass; we surface the
3162 # error below so the caller (and logs) know not every
3163 # channel was reconciled. Retry happens on the next
3164 # slots-cache change notification.
3165 uncovered.append(channel)
3166 continue
3167 old_name = self._shard_channel_to_node.get(channel)
3168 if old_name == new_node.name:
3169 continue
3170 try:
3171 self._migrate_shard_channel(channel, handler, old_name, new_node)
3172 made_progress = True
3173 except (ConnectionError, TimeoutError, OSError) as e:
3174 # Transient connectivity error while subscribing on the
3175 # new owner (or unsubscribing on the old owner if its
3176 # handler chose to re-raise). Do not abort reconciliation
3177 # for sibling channels: _shard_channel_to_node was not
3178 # advanced for this channel, so the next slots-cache
3179 # change notification will retry it.
3180 logger.warning(
3181 "shard channel %r migration deferred: %s: %s",
3182 channel,
3183 type(e).__name__,
3184 e,
3185 )
3186 if first_migrate_error is None:
3187 first_migrate_error = e
3188 continue
3189 # Garbage-collect per-node pubsubs that no longer hold any
3190 # subscription so their connections are released.
3191 for name, pubsub in list(self.node_pubsub_mapping.items()):
3192 if not pubsub.subscribed:
3193 try:
3194 pubsub.reset()
3195 except Exception:
3196 pass
3197 self.node_pubsub_mapping.pop(name, None)
3198 if uncovered:
3199 # Surface the uncovered channels so the caller (and observer
3200 # notification path) knows reconciliation was incomplete. All

Calls 6

SlotNotCoveredErrorClass · 0.90
get_node_from_keyMethod · 0.45
appendMethod · 0.45
getMethod · 0.45
resetMethod · 0.45