MCPcopy
hub / github.com/redis/redis-py / test_published_message_to_pattern

Method test_published_message_to_pattern

tests/test_pubsub.py:728–749  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

726 assert message == make_message("smessage", "foo", "test message")
727
728 def test_published_message_to_pattern(self, r):
729 p = r.pubsub()
730 p.subscribe("foo")
731 p.psubscribe("f*")
732 assert wait_for_message(p) == make_message("subscribe", "foo", 1)
733 assert wait_for_message(p) == make_message("psubscribe", "f*", 2)
734 # 1 to pattern, 1 to channel
735 assert r.publish("foo", "test message") == 2
736
737 message1 = wait_for_message(p)
738 message2 = wait_for_message(p)
739 assert isinstance(message1, dict)
740 assert isinstance(message2, dict)
741
742 expected = [
743 make_message("message", "foo", "test message"),
744 make_message("pmessage", "foo", "test message", pattern="f*"),
745 ]
746
747 assert message1 in expected
748 assert message2 in expected
749 assert message1 != message2
750
751 def test_channel_message_handler(self, r):
752 p = r.pubsub(ignore_subscribe_messages=True)

Callers

nothing calls this directly

Calls 6

publishMethod · 0.80
wait_for_messageFunction · 0.70
make_messageFunction · 0.70
pubsubMethod · 0.45
subscribeMethod · 0.45
psubscribeMethod · 0.45

Tested by

no test coverage detected