MCPcopy
hub / github.com/redis/redis-py / test_published_message_to_channel

Method test_published_message_to_channel

tests/test_pubsub.py:692–700  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

690 self.message = message
691
692 def test_published_message_to_channel(self, r):
693 p = r.pubsub()
694 p.subscribe("foo")
695 assert wait_for_message(p) == make_message("subscribe", "foo", 1)
696 assert r.publish("foo", "test message") == 1
697
698 message = wait_for_message(p)
699 assert isinstance(message, dict)
700 assert message == make_message("message", "foo", "test message")
701
702 @pytest.mark.onlynoncluster
703 @skip_if_server_version_lt("7.0.0")

Callers

nothing calls this directly

Calls 5

publishMethod · 0.80
wait_for_messageFunction · 0.70
make_messageFunction · 0.70
pubsubMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected