(self, channel, handler, old_name, new_node)
| 3214 | raise first_migrate_error |
| 3215 | |
| 3216 | def _migrate_shard_channel(self, channel, handler, old_name, new_node): |
| 3217 | # Detach from the old per-node pubsub, best-effort: the old node may |
| 3218 | # already be unreachable during migration / failover. |
| 3219 | if old_name and old_name in self.node_pubsub_mapping: |
| 3220 | old_pubsub = self.node_pubsub_mapping[old_name] |
| 3221 | try: |
| 3222 | old_pubsub.sunsubscribe(channel) |
| 3223 | except (ConnectionError, TimeoutError, OSError): |
| 3224 | # redis-py's Connection has already called ``disconnect()`` |
| 3225 | # before raising (see Connection.read_response / |
| 3226 | # send_packed_command with ``disconnect_on_error=True``), |
| 3227 | # so ``old_pubsub``'s dedicated socket is gone. Two cases: |
| 3228 | # |
| 3229 | # 1. The old node is no longer in the cluster topology |
| 3230 | # (e.g. removed by failover / topology refresh): no |
| 3231 | # reconnect target exists, so ``old_pubsub.subscribed`` |
| 3232 | # would stay True forever and the end-of-pass GC block |
| 3233 | # would skip it. Drop it eagerly so the round-robin |
| 3234 | # generator does not keep yielding a dead pubsub that |
| 3235 | # produces periodic errors from ``get_sharded_message``. |
| 3236 | # 2. The old node is still known (transiently slow / |
| 3237 | # unreachable): ``PubSub._execute`` auto-reconnects and |
| 3238 | # ``on_connect`` re-subscribes to remaining channels, |
| 3239 | # so other subscriptions on the same pubsub recover |
| 3240 | # naturally. Leave it alone. |
| 3241 | if self.cluster.get_node(node_name=old_name) is None: |
| 3242 | try: |
| 3243 | old_pubsub.reset() |
| 3244 | except Exception: |
| 3245 | pass |
| 3246 | self.node_pubsub_mapping.pop(old_name, None) |
| 3247 | # Attach to the new per-node pubsub, preserving the handler. Decode to |
| 3248 | # a text key only when we must pass it as a kwarg (handler present). |
| 3249 | new_pubsub = self._get_node_pubsub(new_node) |
| 3250 | if handler: |
| 3251 | new_pubsub.ssubscribe(Subscription(channel, handler)) |
| 3252 | else: |
| 3253 | new_pubsub.ssubscribe(channel) |
| 3254 | self.shard_channels.update(new_pubsub.shard_channels) |
| 3255 | normalized_key = next(iter(self._normalize_keys({channel: None}))) |
| 3256 | self._shard_channel_to_node[normalized_key] = new_node.name |
| 3257 | self.pending_unsubscribe_shard_channels.difference_update( |
| 3258 | self._normalize_keys({channel: None}) |
| 3259 | ) |
| 3260 | if new_pubsub.subscribed and not self.subscribed: |
| 3261 | self.subscribed_event.set() |
| 3262 | self.health_check_response_counter = 0 |
| 3263 | |
| 3264 | def on_slots_changed(self): |
| 3265 | # Observer hook invoked by NodesManager after a slots-cache refresh. |
no test coverage detected