(self, r: redis.Redis)
| 2463 | # LIST COMMANDS |
| 2464 | @pytest.mark.onlynoncluster |
| 2465 | async def test_blpop(self, r: redis.Redis): |
| 2466 | await r.rpush("a", "1", "2") |
| 2467 | await r.rpush("b", "3", "4") |
| 2468 | assert_resp_response( |
| 2469 | r, await r.blpop(["b", "a"], timeout=1), (b"b", b"3"), [b"b", b"3"] |
| 2470 | ) |
| 2471 | assert_resp_response( |
| 2472 | r, await r.blpop(["b", "a"], timeout=1), (b"b", b"4"), [b"b", b"4"] |
| 2473 | ) |
| 2474 | assert_resp_response( |
| 2475 | r, await r.blpop(["b", "a"], timeout=1), (b"a", b"1"), [b"a", b"1"] |
| 2476 | ) |
| 2477 | assert_resp_response( |
| 2478 | r, await r.blpop(["b", "a"], timeout=1), (b"a", b"2"), [b"a", b"2"] |
| 2479 | ) |
| 2480 | assert await r.blpop(["b", "a"], timeout=1) is None |
| 2481 | await r.rpush("c", "1") |
| 2482 | assert_resp_response( |
| 2483 | r, await r.blpop("c", timeout=1), (b"c", b"1"), [b"c", b"1"] |
| 2484 | ) |
| 2485 | |
| 2486 | @pytest.mark.onlynoncluster |
| 2487 | async def test_brpop(self, r: redis.Redis): |
nothing calls this directly
no test coverage detected