MCPcopy
hub / github.com/redis/redis-py / test_brpop

Method test_brpop

tests/test_asyncio/test_commands.py:2487–2506  ·  view source on GitHub ↗
(self, r: redis.Redis)

Source from the content-addressed store, hash-verified

2485
2486 @pytest.mark.onlynoncluster
2487 async def test_brpop(self, r: redis.Redis):
2488 await r.rpush("a", "1", "2")
2489 await r.rpush("b", "3", "4")
2490 assert_resp_response(
2491 r, await r.brpop(["b", "a"], timeout=1), (b"b", b"4"), [b"b", b"4"]
2492 )
2493 assert_resp_response(
2494 r, await r.brpop(["b", "a"], timeout=1), (b"b", b"3"), [b"b", b"3"]
2495 )
2496 assert_resp_response(
2497 r, await r.brpop(["b", "a"], timeout=1), (b"a", b"2"), [b"a", b"2"]
2498 )
2499 assert_resp_response(
2500 r, await r.brpop(["b", "a"], timeout=1), (b"a", b"1"), [b"a", b"1"]
2501 )
2502 assert await r.brpop(["b", "a"], timeout=1) is None
2503 await r.rpush("c", "1")
2504 assert_resp_response(
2505 r, await r.brpop("c", timeout=1), (b"c", b"1"), [b"c", b"1"]
2506 )
2507
2508 @pytest.mark.onlynoncluster
2509 async def test_brpoplpush(self, r: redis.Redis):

Callers

nothing calls this directly

Calls 3

assert_resp_responseFunction · 0.90
rpushMethod · 0.80
brpopMethod · 0.80

Tested by

no test coverage detected