(
self,
channel: Any,
handler: Optional[Callable],
old_name: Optional[str],
new_node: "ClusterNode",
)
| 3784 | raise first_migrate_error |
| 3785 | |
| 3786 | async def _migrate_shard_channel( |
| 3787 | self, |
| 3788 | channel: Any, |
| 3789 | handler: Optional[Callable], |
| 3790 | old_name: Optional[str], |
| 3791 | new_node: "ClusterNode", |
| 3792 | ) -> None: |
| 3793 | # Detach from the old per-node pubsub, best-effort: the old node may |
| 3794 | # already be unreachable during migration / failover. |
| 3795 | if old_name and old_name in self.node_pubsub_mapping: |
| 3796 | old_pubsub = self.node_pubsub_mapping[old_name] |
| 3797 | try: |
| 3798 | await old_pubsub.sunsubscribe(channel) |
| 3799 | except (ConnectionError, TimeoutError, OSError): |
| 3800 | # redis-py's Connection has already called ``disconnect()`` |
| 3801 | # before raising (see Connection.read_response / |
| 3802 | # send_packed_command with ``disconnect_on_error=True``), |
| 3803 | # so ``old_pubsub``'s dedicated socket is gone. Two cases: |
| 3804 | # |
| 3805 | # 1. The old node is no longer in the cluster topology |
| 3806 | # (e.g. removed by failover / topology refresh): no |
| 3807 | # reconnect target exists, so ``old_pubsub.subscribed`` |
| 3808 | # would stay True forever and the end-of-pass GC block |
| 3809 | # would skip it. Drop it eagerly so the round-robin |
| 3810 | # generator does not keep yielding a dead pubsub that |
| 3811 | # produces periodic errors from ``get_sharded_message``. |
| 3812 | # 2. The old node is still known (transiently slow / |
| 3813 | # unreachable): ``PubSub._execute`` auto-reconnects and |
| 3814 | # ``on_connect`` re-subscribes to remaining channels, |
| 3815 | # so other subscriptions on the same pubsub recover |
| 3816 | # naturally. Leave it alone. |
| 3817 | if self.cluster.get_node(node_name=old_name) is None: |
| 3818 | try: |
| 3819 | await old_pubsub.aclose() |
| 3820 | except Exception: |
| 3821 | pass |
| 3822 | self.node_pubsub_mapping.pop(old_name, None) |
| 3823 | # Attach to the new per-node pubsub, preserving the handler. Decode to |
| 3824 | # a text key only when we must pass it as a kwarg (handler present). |
| 3825 | new_pubsub = self._get_node_pubsub(new_node) |
| 3826 | if handler: |
| 3827 | await new_pubsub.ssubscribe(Subscription(channel, handler)) |
| 3828 | else: |
| 3829 | await new_pubsub.ssubscribe(channel) |
| 3830 | self.shard_channels.update(new_pubsub.shard_channels) |
| 3831 | normalized_key = next(iter(self._normalize_keys({channel: None}))) |
| 3832 | self._shard_channel_to_node[normalized_key] = new_node.name |
| 3833 | self.pending_unsubscribe_shard_channels.difference_update( |
| 3834 | self._normalize_keys({channel: None}) |
| 3835 | ) |
| 3836 | |
| 3837 | async def on_slots_changed(self) -> None: |
| 3838 | # Observer hook invoked by NodesManager after a slots-cache refresh. |
no test coverage detected