PubSub provides publish, subscribe and listen support to Redis channels. After subscribing to one or more channels, the listen() method will block until a message arrives on one of the subscribed channels. That message will be returned and it's safe to start listening again.
| 966 | |
| 967 | |
| 968 | class PubSub: |
| 969 | """ |
| 970 | PubSub provides publish, subscribe and listen support to Redis channels. |
| 971 | |
| 972 | After subscribing to one or more channels, the listen() method will block |
| 973 | until a message arrives on one of the subscribed channels. That message |
| 974 | will be returned and it's safe to start listening again. |
| 975 | """ |
| 976 | |
| 977 | PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage") |
| 978 | UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe") |
| 979 | HEALTH_CHECK_MESSAGE = "redis-py-health-check" |
| 980 | |
| 981 | def __init__( |
| 982 | self, |
| 983 | connection_pool, |
| 984 | shard_hint=None, |
| 985 | ignore_subscribe_messages: bool = False, |
| 986 | encoder: Optional["Encoder"] = None, |
| 987 | push_handler_func: Union[None, Callable[[str], None]] = None, |
| 988 | event_dispatcher: Optional["EventDispatcher"] = None, |
| 989 | ): |
| 990 | self.connection_pool = connection_pool |
| 991 | self.shard_hint = shard_hint |
| 992 | self.ignore_subscribe_messages = ignore_subscribe_messages |
| 993 | self.connection = None |
| 994 | self.subscribed_event = threading.Event() |
| 995 | # we need to know the encoding options for this connection in order |
| 996 | # to lookup channel and pattern names for callback handlers. |
| 997 | self.encoder = encoder |
| 998 | self.push_handler_func = push_handler_func |
| 999 | if event_dispatcher is None: |
| 1000 | self._event_dispatcher = EventDispatcher() |
| 1001 | else: |
| 1002 | self._event_dispatcher = event_dispatcher |
| 1003 | |
| 1004 | self._lock = threading.RLock() |
| 1005 | if self.encoder is None: |
| 1006 | self.encoder = self.connection_pool.get_encoder() |
| 1007 | self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE) |
| 1008 | if self.encoder.decode_responses: |
| 1009 | self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE] |
| 1010 | else: |
| 1011 | self.health_check_response = [b"pong", self.health_check_response_b] |
| 1012 | if self.push_handler_func is None: |
| 1013 | _set_info_logger() |
| 1014 | self.reset() |
| 1015 | |
| 1016 | def __enter__(self) -> "PubSub": |
| 1017 | return self |
| 1018 | |
| 1019 | def __exit__(self, exc_type, exc_value, traceback) -> None: |
| 1020 | self.reset() |
| 1021 | |
| 1022 | def __del__(self) -> None: |
| 1023 | try: |
| 1024 | # if this object went out of scope prior to shutting down |
| 1025 | # subscriptions, close the connection manually before |
no outgoing calls