(self, r: redis.Redis)
| 5187 | |
| 5188 | @skip_if_server_version_lt("5.0.0") |
| 5189 | async def test_xpending(self, r: redis.Redis): |
| 5190 | stream = "stream" |
| 5191 | group = "group" |
| 5192 | consumer1 = "consumer1" |
| 5193 | consumer2 = "consumer2" |
| 5194 | m1 = await r.xadd(stream, {"foo": "bar"}) |
| 5195 | m2 = await r.xadd(stream, {"foo": "bar"}) |
| 5196 | await r.xgroup_create(stream, group, 0) |
| 5197 | |
| 5198 | # xpending on a group that has no consumers yet |
| 5199 | expected = {"pending": 0, "min": None, "max": None, "consumers": []} |
| 5200 | assert await r.xpending(stream, group) == expected |
| 5201 | |
| 5202 | # read 1 message from the group with each consumer |
| 5203 | await r.xreadgroup(group, consumer1, streams={stream: ">"}, count=1) |
| 5204 | await r.xreadgroup(group, consumer2, streams={stream: ">"}, count=1) |
| 5205 | |
| 5206 | expected = { |
| 5207 | "pending": 2, |
| 5208 | "min": m1, |
| 5209 | "max": m2, |
| 5210 | "consumers": [ |
| 5211 | {"name": consumer1.encode(), "pending": 1}, |
| 5212 | {"name": consumer2.encode(), "pending": 1}, |
| 5213 | ], |
| 5214 | } |
| 5215 | assert await r.xpending(stream, group) == expected |
| 5216 | |
| 5217 | @skip_if_server_version_lt("5.0.0") |
| 5218 | async def test_xpending_range(self, r: redis.Redis): |
nothing calls this directly
no test coverage detected