MCPcopy
hub / github.com/redis/redis-py / test_channel_message_handler

Method test_channel_message_handler

tests/test_pubsub.py:969–984  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

967 )
968
969 def test_channel_message_handler(self, r):
970 p = r.pubsub(ignore_subscribe_messages=True)
971 p.subscribe(**{self.channel: self.message_handler})
972 assert wait_for_message(p) is None
973 r.publish(self.channel, self.data)
974 assert wait_for_message(p) is None
975 assert self.message == self.make_message("message", self.channel, self.data)
976
977 # test that we reconnected to the correct channel
978 self.message = None
979 p.connection.disconnect()
980 assert wait_for_message(p) is None # should reconnect
981 new_data = self.data + "new data"
982 r.publish(self.channel, new_data)
983 assert wait_for_message(p) is None
984 assert self.message == self.make_message("message", self.channel, new_data)
985
986 def test_pattern_message_handler(self, r):
987 p = r.pubsub(ignore_subscribe_messages=True)

Callers

nothing calls this directly

Calls 6

make_messageMethod · 0.95
publishMethod · 0.80
wait_for_messageFunction · 0.70
pubsubMethod · 0.45
subscribeMethod · 0.45
disconnectMethod · 0.45

Tested by

no test coverage detected