(self, r)
| 3244 | # LIST COMMANDS |
| 3245 | @pytest.mark.onlynoncluster |
| 3246 | def test_blpop(self, r): |
| 3247 | r.rpush("a", "1", "2") |
| 3248 | r.rpush("b", "3", "4") |
| 3249 | assert_resp_response( |
| 3250 | r, r.blpop(["b", "a"], timeout=1), (b"b", b"3"), [b"b", b"3"] |
| 3251 | ) |
| 3252 | assert_resp_response( |
| 3253 | r, r.blpop(["b", "a"], timeout=1), (b"b", b"4"), [b"b", b"4"] |
| 3254 | ) |
| 3255 | assert_resp_response( |
| 3256 | r, r.blpop(["b", "a"], timeout=1), (b"a", b"1"), [b"a", b"1"] |
| 3257 | ) |
| 3258 | assert_resp_response( |
| 3259 | r, r.blpop(["b", "a"], timeout=1), (b"a", b"2"), [b"a", b"2"] |
| 3260 | ) |
| 3261 | assert r.blpop(["b", "a"], timeout=1) is None |
| 3262 | r.rpush("c", "1") |
| 3263 | assert_resp_response(r, r.blpop("c", timeout=1), (b"c", b"1"), [b"c", b"1"]) |
| 3264 | |
| 3265 | @pytest.mark.onlynoncluster |
| 3266 | def test_brpop(self, r): |
nothing calls this directly
no test coverage detected