MCPcopy
hub / github.com/redis/redis-py / test_pubsub_deadlock

Method test_pubsub_deadlock

tests/test_pubsub.py:1434–1441  ·  view source on GitHub ↗
(self, master_host)

Source from the content-addressed store, hash-verified

1432class TestPubSubDeadlock:
1433 @pytest.mark.timeout(30, method="thread")
1434 def test_pubsub_deadlock(self, master_host):
1435 pool = redis.ConnectionPool(host=master_host[0], port=master_host[1])
1436 r = redis.Redis(connection_pool=pool)
1437
1438 for i in range(60):
1439 p = r.pubsub()
1440 p.subscribe("my-channel-1", "my-channel-2")
1441 pool.reset()
1442
1443
1444@pytest.mark.timeout(5, method="thread")

Callers

nothing calls this directly

Calls 3

pubsubMethod · 0.95
resetMethod · 0.95
subscribeMethod · 0.45

Tested by

no test coverage detected