MCPcopy
hub / github.com/redis/redis-py / _migrate_shard_channel

Method _migrate_shard_channel

redis/cluster.py:3216–3262  ·  view source on GitHub ↗
(self, channel, handler, old_name, new_node)

Source from the content-addressed store, hash-verified

3214 raise first_migrate_error
3215
3216 def _migrate_shard_channel(self, channel, handler, old_name, new_node):
3217 # Detach from the old per-node pubsub, best-effort: the old node may
3218 # already be unreachable during migration / failover.
3219 if old_name and old_name in self.node_pubsub_mapping:
3220 old_pubsub = self.node_pubsub_mapping[old_name]
3221 try:
3222 old_pubsub.sunsubscribe(channel)
3223 except (ConnectionError, TimeoutError, OSError):
3224 # redis-py's Connection has already called ``disconnect()``
3225 # before raising (see Connection.read_response /
3226 # send_packed_command with ``disconnect_on_error=True``),
3227 # so ``old_pubsub``'s dedicated socket is gone. Two cases:
3228 #
3229 # 1. The old node is no longer in the cluster topology
3230 # (e.g. removed by failover / topology refresh): no
3231 # reconnect target exists, so ``old_pubsub.subscribed``
3232 # would stay True forever and the end-of-pass GC block
3233 # would skip it. Drop it eagerly so the round-robin
3234 # generator does not keep yielding a dead pubsub that
3235 # produces periodic errors from ``get_sharded_message``.
3236 # 2. The old node is still known (transiently slow /
3237 # unreachable): ``PubSub._execute`` auto-reconnects and
3238 # ``on_connect`` re-subscribes to remaining channels,
3239 # so other subscriptions on the same pubsub recover
3240 # naturally. Leave it alone.
3241 if self.cluster.get_node(node_name=old_name) is None:
3242 try:
3243 old_pubsub.reset()
3244 except Exception:
3245 pass
3246 self.node_pubsub_mapping.pop(old_name, None)
3247 # Attach to the new per-node pubsub, preserving the handler. Decode to
3248 # a text key only when we must pass it as a kwarg (handler present).
3249 new_pubsub = self._get_node_pubsub(new_node)
3250 if handler:
3251 new_pubsub.ssubscribe(Subscription(channel, handler))
3252 else:
3253 new_pubsub.ssubscribe(channel)
3254 self.shard_channels.update(new_pubsub.shard_channels)
3255 normalized_key = next(iter(self._normalize_keys({channel: None})))
3256 self._shard_channel_to_node[normalized_key] = new_node.name
3257 self.pending_unsubscribe_shard_channels.difference_update(
3258 self._normalize_keys({channel: None})
3259 )
3260 if new_pubsub.subscribed and not self.subscribed:
3261 self.subscribed_event.set()
3262 self.health_check_response_counter = 0
3263
3264 def on_slots_changed(self):
3265 # Observer hook invoked by NodesManager after a slots-cache refresh.

Callers 2

ssubscribeMethod · 0.95

Calls 9

_get_node_pubsubMethod · 0.95
SubscriptionClass · 0.90
updateMethod · 0.80
sunsubscribeMethod · 0.45
get_nodeMethod · 0.45
resetMethod · 0.45
ssubscribeMethod · 0.45
_normalize_keysMethod · 0.45
setMethod · 0.45

Tested by

no test coverage detected