Get or create a PubSub instance for the given node.
(self, node: "ClusterNode")
| 3510 | await self._resubscribe(subscriptions, self.ssubscribe) |
| 3511 | |
| 3512 | def _get_node_pubsub(self, node: "ClusterNode") -> PubSub: |
| 3513 | """Get or create a PubSub instance for the given node.""" |
| 3514 | try: |
| 3515 | return self.node_pubsub_mapping[node.name] |
| 3516 | except KeyError: |
| 3517 | pubsub = PubSub( |
| 3518 | connection_pool=_ClusterNodePoolAdapter(node), |
| 3519 | encoder=self.cluster.encoder, |
| 3520 | push_handler_func=self.push_handler_func, |
| 3521 | event_dispatcher=self._event_dispatcher, |
| 3522 | ) |
| 3523 | # Replay shard subscriptions on reconnect with slot-aware grouping |
| 3524 | # so that channels spanning multiple slots owned by this node do |
| 3525 | # not trigger a CROSSSLOT error. |
| 3526 | pubsub._resubscribe_shard_channels = MethodType( |
| 3527 | ClusterPubSub._resubscribe_shard_channels, pubsub |
| 3528 | ) |
| 3529 | self.node_pubsub_mapping[node.name] = pubsub |
| 3530 | return pubsub |
| 3531 | |
| 3532 | def _find_node_name_for_pubsub(self, pubsub: PubSub) -> Optional[str]: |
| 3533 | for name, candidate in self.node_pubsub_mapping.items(): |
no test coverage detected