MCPcopy
hub / github.com/redis/redis-py / reinitialize_shard_subscriptions

Method reinitialize_shard_subscriptions

redis/asyncio/cluster.py:3711–3784  ·  view source on GitHub ↗

Reconcile per-node shard subscriptions against the cluster's current slot ownership map. For each tracked shard channel whose owning node has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on the old node's pubsub and ssubscribe on the new owner's pubs

(self)

Source from the content-addressed store, hash-verified

3709 )
3710
3711 async def reinitialize_shard_subscriptions(self) -> None:
3712 """
3713 Reconcile per-node shard subscriptions against the cluster's current
3714 slot ownership map. For each tracked shard channel whose owning node
3715 has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on
3716 the old node's pubsub and ssubscribe on the new owner's pubsub,
3717 preserving any registered handler.
3718 """
3719 uncovered: list = []
3720 made_progress = False
3721 first_migrate_error: Optional[BaseException] = None
3722 async with self._shard_state_lock:
3723 for channel, handler in list(self.shard_channels.items()):
3724 try:
3725 new_node = self.cluster.get_node_from_key(channel)
3726 except SlotNotCoveredError:
3727 # Slot is transiently uncovered (mid-migration / partial
3728 # topology refresh). Defer this channel so coverable
3729 # siblings still reconcile this pass; we surface the
3730 # error below so the caller (and logs) know not every
3731 # channel was reconciled. Retry happens on the next
3732 # slots-cache change notification.
3733 uncovered.append(channel)
3734 continue
3735 old_name = self._shard_channel_to_node.get(channel)
3736 if old_name == new_node.name:
3737 continue
3738 try:
3739 await self._migrate_shard_channel(
3740 channel, handler, old_name, new_node
3741 )
3742 made_progress = True
3743 except (ConnectionError, TimeoutError, OSError) as e:
3744 # Transient connectivity error while subscribing on the
3745 # new owner (or unsubscribing on the old owner if its
3746 # handler chose to re-raise). Do not abort reconciliation
3747 # for sibling channels: _shard_channel_to_node was not
3748 # advanced for this channel, so the next slots-cache
3749 # change notification will retry it.
3750 logger.warning(
3751 "shard channel %r migration deferred: %s: %s",
3752 channel,
3753 type(e).__name__,
3754 e,
3755 )
3756 if first_migrate_error is None:
3757 first_migrate_error = e
3758 continue
3759 # Garbage-collect per-node pubsubs that no longer hold any
3760 # subscription so their connections are released.
3761 for name, pubsub in list(self.node_pubsub_mapping.items()):
3762 if not pubsub.subscribed:
3763 try:
3764 await pubsub.aclose()
3765 except Exception:
3766 pass
3767 self.node_pubsub_mapping.pop(name, None)
3768 if uncovered:

Callers 1

on_slots_changedMethod · 0.95

Calls 6

SlotNotCoveredErrorClass · 0.90
get_node_from_keyMethod · 0.45
appendMethod · 0.45
getMethod · 0.45
acloseMethod · 0.45

Tested by

no test coverage detected