MCPcopy
hub / github.com/redis/redis-py / test_pubsub_channels

Method test_pubsub_channels

tests/test_pubsub.py:1052–1058  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

1050 @pytest.mark.onlynoncluster
1051 @skip_if_server_version_lt("2.8.0")
1052 def test_pubsub_channels(self, r):
1053 p = r.pubsub()
1054 p.subscribe("foo", "bar", "baz", "quux")
1055 for i in range(4):
1056 assert wait_for_message(p)["type"] == "subscribe"
1057 expected = [b"bar", b"baz", b"foo", b"quux"]
1058 assert all([channel in r.pubsub_channels() for channel in expected])
1059
1060 @pytest.mark.onlynoncluster
1061 @skip_if_server_version_lt("7.0.0")

Callers

nothing calls this directly

Calls 4

pubsub_channelsMethod · 0.80
wait_for_messageFunction · 0.70
pubsubMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected