MCPcopy
hub / github.com/redis/redis-py / PubSub

Class PubSub

redis/client.py:968–1613  ·  view source on GitHub ↗

PubSub provides publish, subscribe and listen support to Redis channels. After subscribing to one or more channels, the listen() method will block until a message arrives on one of the subscribed channels. That message will be returned and it's safe to start listening again.

Source from the content-addressed store, hash-verified

966
967
968class PubSub:
969 """
970 PubSub provides publish, subscribe and listen support to Redis channels.
971
972 After subscribing to one or more channels, the listen() method will block
973 until a message arrives on one of the subscribed channels. That message
974 will be returned and it's safe to start listening again.
975 """
976
977 PUBLISH_MESSAGE_TYPES = ("message", "pmessage", "smessage")
978 UNSUBSCRIBE_MESSAGE_TYPES = ("unsubscribe", "punsubscribe", "sunsubscribe")
979 HEALTH_CHECK_MESSAGE = "redis-py-health-check"
980
981 def __init__(
982 self,
983 connection_pool,
984 shard_hint=None,
985 ignore_subscribe_messages: bool = False,
986 encoder: Optional["Encoder"] = None,
987 push_handler_func: Union[None, Callable[[str], None]] = None,
988 event_dispatcher: Optional["EventDispatcher"] = None,
989 ):
990 self.connection_pool = connection_pool
991 self.shard_hint = shard_hint
992 self.ignore_subscribe_messages = ignore_subscribe_messages
993 self.connection = None
994 self.subscribed_event = threading.Event()
995 # we need to know the encoding options for this connection in order
996 # to lookup channel and pattern names for callback handlers.
997 self.encoder = encoder
998 self.push_handler_func = push_handler_func
999 if event_dispatcher is None:
1000 self._event_dispatcher = EventDispatcher()
1001 else:
1002 self._event_dispatcher = event_dispatcher
1003
1004 self._lock = threading.RLock()
1005 if self.encoder is None:
1006 self.encoder = self.connection_pool.get_encoder()
1007 self.health_check_response_b = self.encoder.encode(self.HEALTH_CHECK_MESSAGE)
1008 if self.encoder.decode_responses:
1009 self.health_check_response = ["pong", self.HEALTH_CHECK_MESSAGE]
1010 else:
1011 self.health_check_response = [b"pong", self.health_check_response_b]
1012 if self.push_handler_func is None:
1013 _set_info_logger()
1014 self.reset()
1015
1016 def __enter__(self) -> "PubSub":
1017 return self
1018
1019 def __exit__(self, exc_type, exc_value, traceback) -> None:
1020 self.reset()
1021
1022 def __del__(self) -> None:
1023 try:
1024 # if this object went out of scope prior to shutting down
1025 # subscriptions, close the connection manually before

Calls

no outgoing calls