MCPcopy
hub / github.com/redis/redis-py / test_cluster_brpop

Method test_cluster_brpop

tests/test_cluster.py:2044–2075  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

2042 )
2043
2044 def test_cluster_brpop(self, r):
2045 r.rpush("{foo}a", "1", "2")
2046 r.rpush("{foo}b", "3", "4")
2047 assert_resp_response(
2048 r,
2049 r.brpop(["{foo}b", "{foo}a"], timeout=1),
2050 (b"{foo}b", b"4"),
2051 [b"{foo}b", b"4"],
2052 )
2053 assert_resp_response(
2054 r,
2055 r.brpop(["{foo}b", "{foo}a"], timeout=1),
2056 (b"{foo}b", b"3"),
2057 [b"{foo}b", b"3"],
2058 )
2059 assert_resp_response(
2060 r,
2061 r.brpop(["{foo}b", "{foo}a"], timeout=1),
2062 (b"{foo}a", b"2"),
2063 [b"{foo}a", b"2"],
2064 )
2065 assert_resp_response(
2066 r,
2067 r.brpop(["{foo}b", "{foo}a"], timeout=1),
2068 (b"{foo}a", b"1"),
2069 [b"{foo}a", b"1"],
2070 )
2071 assert r.brpop(["{foo}b", "{foo}a"], timeout=1) is None
2072 r.rpush("{foo}c", "1")
2073 assert_resp_response(
2074 r, r.brpop("{foo}c", timeout=1), (b"{foo}c", b"1"), [b"{foo}c", b"1"]
2075 )
2076
2077 def test_cluster_brpoplpush(self, r):
2078 r.rpush("{foo}a", "1", "2")

Callers

nothing calls this directly

Calls 3

assert_resp_responseFunction · 0.85
rpushMethod · 0.80
brpopMethod · 0.80

Tested by

no test coverage detected