MCPcopy
hub / github.com/redis/redis-py / test_channel_message_handler

Method test_channel_message_handler

tests/test_pubsub.py:751–757  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

749 assert message1 != message2
750
751 def test_channel_message_handler(self, r):
752 p = r.pubsub(ignore_subscribe_messages=True)
753 p.subscribe(foo=self.message_handler)
754 assert wait_for_message(p) is None
755 assert r.publish("foo", "test message") == 1
756 assert wait_for_message(p) is None
757 assert self.message == make_message("message", "foo", "test message")
758
759 @pytest.mark.onlynoncluster
760 def test_binary_channel_message_handler_with_subscription(self, r):

Callers

nothing calls this directly

Calls 5

publishMethod · 0.80
wait_for_messageFunction · 0.70
make_messageFunction · 0.70
pubsubMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected