(self, r: redis.Redis)
| 2485 | |
| 2486 | @pytest.mark.onlynoncluster |
| 2487 | async def test_brpop(self, r: redis.Redis): |
| 2488 | await r.rpush("a", "1", "2") |
| 2489 | await r.rpush("b", "3", "4") |
| 2490 | assert_resp_response( |
| 2491 | r, await r.brpop(["b", "a"], timeout=1), (b"b", b"4"), [b"b", b"4"] |
| 2492 | ) |
| 2493 | assert_resp_response( |
| 2494 | r, await r.brpop(["b", "a"], timeout=1), (b"b", b"3"), [b"b", b"3"] |
| 2495 | ) |
| 2496 | assert_resp_response( |
| 2497 | r, await r.brpop(["b", "a"], timeout=1), (b"a", b"2"), [b"a", b"2"] |
| 2498 | ) |
| 2499 | assert_resp_response( |
| 2500 | r, await r.brpop(["b", "a"], timeout=1), (b"a", b"1"), [b"a", b"1"] |
| 2501 | ) |
| 2502 | assert await r.brpop(["b", "a"], timeout=1) is None |
| 2503 | await r.rpush("c", "1") |
| 2504 | assert_resp_response( |
| 2505 | r, await r.brpop("c", timeout=1), (b"c", b"1"), [b"c", b"1"] |
| 2506 | ) |
| 2507 | |
| 2508 | @pytest.mark.onlynoncluster |
| 2509 | async def test_brpoplpush(self, r: redis.Redis): |
nothing calls this directly
no test coverage detected