Reconcile per-node shard subscriptions against the cluster's current slot ownership map. For each tracked shard channel whose owning node has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on the old node's pubsub and ssubscribe on the new owner's pubs
(self)
| 3709 | ) |
| 3710 | |
| 3711 | async def reinitialize_shard_subscriptions(self) -> None: |
| 3712 | """ |
| 3713 | Reconcile per-node shard subscriptions against the cluster's current |
| 3714 | slot ownership map. For each tracked shard channel whose owning node |
| 3715 | has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on |
| 3716 | the old node's pubsub and ssubscribe on the new owner's pubsub, |
| 3717 | preserving any registered handler. |
| 3718 | """ |
| 3719 | uncovered: list = [] |
| 3720 | made_progress = False |
| 3721 | first_migrate_error: Optional[BaseException] = None |
| 3722 | async with self._shard_state_lock: |
| 3723 | for channel, handler in list(self.shard_channels.items()): |
| 3724 | try: |
| 3725 | new_node = self.cluster.get_node_from_key(channel) |
| 3726 | except SlotNotCoveredError: |
| 3727 | # Slot is transiently uncovered (mid-migration / partial |
| 3728 | # topology refresh). Defer this channel so coverable |
| 3729 | # siblings still reconcile this pass; we surface the |
| 3730 | # error below so the caller (and logs) know not every |
| 3731 | # channel was reconciled. Retry happens on the next |
| 3732 | # slots-cache change notification. |
| 3733 | uncovered.append(channel) |
| 3734 | continue |
| 3735 | old_name = self._shard_channel_to_node.get(channel) |
| 3736 | if old_name == new_node.name: |
| 3737 | continue |
| 3738 | try: |
| 3739 | await self._migrate_shard_channel( |
| 3740 | channel, handler, old_name, new_node |
| 3741 | ) |
| 3742 | made_progress = True |
| 3743 | except (ConnectionError, TimeoutError, OSError) as e: |
| 3744 | # Transient connectivity error while subscribing on the |
| 3745 | # new owner (or unsubscribing on the old owner if its |
| 3746 | # handler chose to re-raise). Do not abort reconciliation |
| 3747 | # for sibling channels: _shard_channel_to_node was not |
| 3748 | # advanced for this channel, so the next slots-cache |
| 3749 | # change notification will retry it. |
| 3750 | logger.warning( |
| 3751 | "shard channel %r migration deferred: %s: %s", |
| 3752 | channel, |
| 3753 | type(e).__name__, |
| 3754 | e, |
| 3755 | ) |
| 3756 | if first_migrate_error is None: |
| 3757 | first_migrate_error = e |
| 3758 | continue |
| 3759 | # Garbage-collect per-node pubsubs that no longer hold any |
| 3760 | # subscription so their connections are released. |
| 3761 | for name, pubsub in list(self.node_pubsub_mapping.items()): |
| 3762 | if not pubsub.subscribed: |
| 3763 | try: |
| 3764 | await pubsub.aclose() |
| 3765 | except Exception: |
| 3766 | pass |
| 3767 | self.node_pubsub_mapping.pop(name, None) |
| 3768 | if uncovered: |
no test coverage detected