MCPcopy
hub / github.com/redis/redis-py / test_pattern_message_handler

Method test_pattern_message_handler

tests/test_pubsub.py:799–807  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

797
798 @pytest.mark.onlynoncluster
799 def test_pattern_message_handler(self, r):
800 p = r.pubsub(ignore_subscribe_messages=True)
801 p.psubscribe(**{"f*": self.message_handler})
802 assert wait_for_message(p) is None
803 assert r.publish("foo", "test message") == 1
804 assert wait_for_message(p) is None
805 assert self.message == make_message(
806 "pmessage", "foo", "test message", pattern="f*"
807 )
808
809 @pytest.mark.onlynoncluster
810 def test_binary_pattern_message_handler_with_subscription(self, r):

Callers

nothing calls this directly

Calls 5

publishMethod · 0.80
wait_for_messageFunction · 0.70
make_messageFunction · 0.70
pubsubMethod · 0.45
psubscribeMethod · 0.45

Tested by

no test coverage detected