Reconcile per-node shard subscriptions against the cluster's current slot ownership map. For each tracked shard channel whose owning node has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on the old node's pubsub and ssubscribe on the new owner's pubs
(self)
| 3141 | ) |
| 3142 | |
| 3143 | def reinitialize_shard_subscriptions(self): |
| 3144 | """ |
| 3145 | Reconcile per-node shard subscriptions against the cluster's current |
| 3146 | slot ownership map. For each tracked shard channel whose owning node |
| 3147 | has changed (e.g. after CLUSTER SETSLOT / failover), sunsubscribe on |
| 3148 | the old node's pubsub and ssubscribe on the new owner's pubsub, |
| 3149 | preserving any registered handler. |
| 3150 | """ |
| 3151 | uncovered: list = [] |
| 3152 | made_progress = False |
| 3153 | first_migrate_error: Optional[BaseException] = None |
| 3154 | with self._shard_state_lock: |
| 3155 | for channel, handler in list(self.shard_channels.items()): |
| 3156 | try: |
| 3157 | new_node = self.cluster.get_node_from_key(channel) |
| 3158 | except SlotNotCoveredError: |
| 3159 | # Slot is transiently uncovered (mid-migration / partial |
| 3160 | # topology refresh). Defer this channel so coverable |
| 3161 | # siblings still reconcile this pass; we surface the |
| 3162 | # error below so the caller (and logs) know not every |
| 3163 | # channel was reconciled. Retry happens on the next |
| 3164 | # slots-cache change notification. |
| 3165 | uncovered.append(channel) |
| 3166 | continue |
| 3167 | old_name = self._shard_channel_to_node.get(channel) |
| 3168 | if old_name == new_node.name: |
| 3169 | continue |
| 3170 | try: |
| 3171 | self._migrate_shard_channel(channel, handler, old_name, new_node) |
| 3172 | made_progress = True |
| 3173 | except (ConnectionError, TimeoutError, OSError) as e: |
| 3174 | # Transient connectivity error while subscribing on the |
| 3175 | # new owner (or unsubscribing on the old owner if its |
| 3176 | # handler chose to re-raise). Do not abort reconciliation |
| 3177 | # for sibling channels: _shard_channel_to_node was not |
| 3178 | # advanced for this channel, so the next slots-cache |
| 3179 | # change notification will retry it. |
| 3180 | logger.warning( |
| 3181 | "shard channel %r migration deferred: %s: %s", |
| 3182 | channel, |
| 3183 | type(e).__name__, |
| 3184 | e, |
| 3185 | ) |
| 3186 | if first_migrate_error is None: |
| 3187 | first_migrate_error = e |
| 3188 | continue |
| 3189 | # Garbage-collect per-node pubsubs that no longer hold any |
| 3190 | # subscription so their connections are released. |
| 3191 | for name, pubsub in list(self.node_pubsub_mapping.items()): |
| 3192 | if not pubsub.subscribed: |
| 3193 | try: |
| 3194 | pubsub.reset() |
| 3195 | except Exception: |
| 3196 | pass |
| 3197 | self.node_pubsub_mapping.pop(name, None) |
| 3198 | if uncovered: |
| 3199 | # Surface the uncovered channels so the caller (and observer |
| 3200 | # notification path) knows reconciliation was incomplete. All |