MCPcopy
hub / github.com/redis/redis-py / _migrate_shard_channel

Method _migrate_shard_channel

redis/asyncio/cluster.py:3786–3835  ·  view source on GitHub ↗
(
        self,
        channel: Any,
        handler: Optional[Callable],
        old_name: Optional[str],
        new_node: "ClusterNode",
    )

Source from the content-addressed store, hash-verified

3784 raise first_migrate_error
3785
3786 async def _migrate_shard_channel(
3787 self,
3788 channel: Any,
3789 handler: Optional[Callable],
3790 old_name: Optional[str],
3791 new_node: "ClusterNode",
3792 ) -> None:
3793 # Detach from the old per-node pubsub, best-effort: the old node may
3794 # already be unreachable during migration / failover.
3795 if old_name and old_name in self.node_pubsub_mapping:
3796 old_pubsub = self.node_pubsub_mapping[old_name]
3797 try:
3798 await old_pubsub.sunsubscribe(channel)
3799 except (ConnectionError, TimeoutError, OSError):
3800 # redis-py's Connection has already called ``disconnect()``
3801 # before raising (see Connection.read_response /
3802 # send_packed_command with ``disconnect_on_error=True``),
3803 # so ``old_pubsub``'s dedicated socket is gone. Two cases:
3804 #
3805 # 1. The old node is no longer in the cluster topology
3806 # (e.g. removed by failover / topology refresh): no
3807 # reconnect target exists, so ``old_pubsub.subscribed``
3808 # would stay True forever and the end-of-pass GC block
3809 # would skip it. Drop it eagerly so the round-robin
3810 # generator does not keep yielding a dead pubsub that
3811 # produces periodic errors from ``get_sharded_message``.
3812 # 2. The old node is still known (transiently slow /
3813 # unreachable): ``PubSub._execute`` auto-reconnects and
3814 # ``on_connect`` re-subscribes to remaining channels,
3815 # so other subscriptions on the same pubsub recover
3816 # naturally. Leave it alone.
3817 if self.cluster.get_node(node_name=old_name) is None:
3818 try:
3819 await old_pubsub.aclose()
3820 except Exception:
3821 pass
3822 self.node_pubsub_mapping.pop(old_name, None)
3823 # Attach to the new per-node pubsub, preserving the handler. Decode to
3824 # a text key only when we must pass it as a kwarg (handler present).
3825 new_pubsub = self._get_node_pubsub(new_node)
3826 if handler:
3827 await new_pubsub.ssubscribe(Subscription(channel, handler))
3828 else:
3829 await new_pubsub.ssubscribe(channel)
3830 self.shard_channels.update(new_pubsub.shard_channels)
3831 normalized_key = next(iter(self._normalize_keys({channel: None})))
3832 self._shard_channel_to_node[normalized_key] = new_node.name
3833 self.pending_unsubscribe_shard_channels.difference_update(
3834 self._normalize_keys({channel: None})
3835 )
3836
3837 async def on_slots_changed(self) -> None:
3838 # Observer hook invoked by NodesManager after a slots-cache refresh.

Callers 2

ssubscribeMethod · 0.95

Calls 8

_get_node_pubsubMethod · 0.95
SubscriptionClass · 0.90
updateMethod · 0.80
sunsubscribeMethod · 0.45
get_nodeMethod · 0.45
acloseMethod · 0.45
ssubscribeMethod · 0.45
_normalize_keysMethod · 0.45

Tested by

no test coverage detected