MCPcopy
hub / github.com/redis/redis-py / test_cluster_pubsub_channels

Method test_cluster_pubsub_channels

tests/test_cluster.py:1298–1306  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

1296
1297 @skip_if_server_version_lt("2.8.0")
1298 def test_cluster_pubsub_channels(self, r):
1299 p = r.pubsub()
1300 p.subscribe("foo", "bar", "baz", "quux")
1301 for i in range(4):
1302 assert wait_for_message(p, timeout=0.5)["type"] == "subscribe"
1303 expected = [b"bar", b"baz", b"foo", b"quux"]
1304 assert all(
1305 [channel in r.pubsub_channels(target_nodes="all") for channel in expected]
1306 )
1307
1308 @skip_if_server_version_lt("2.8.0")
1309 def test_cluster_pubsub_numsub(self, r):

Callers

nothing calls this directly

Calls 4

wait_for_messageFunction · 0.90
pubsub_channelsMethod · 0.80
pubsubMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected