(self, r: redis.Redis)
| 5306 | |
| 5307 | @skip_if_server_version_lt("5.0.0") |
| 5308 | async def test_xreadgroup(self, r: redis.Redis): |
| 5309 | stream = "stream" |
| 5310 | group = "group" |
| 5311 | consumer = "consumer" |
| 5312 | m1 = await r.xadd(stream, {"foo": "bar"}) |
| 5313 | m2 = await r.xadd(stream, {"bing": "baz"}) |
| 5314 | await r.xgroup_create(stream, group, 0) |
| 5315 | |
| 5316 | strem_name = stream.encode() |
| 5317 | expected_entries = [ |
| 5318 | await get_stream_message(r, stream, m1), |
| 5319 | await get_stream_message(r, stream, m2), |
| 5320 | ] |
| 5321 | |
| 5322 | # xread starting at 0 returns both messages |
| 5323 | res = await r.xreadgroup(group, consumer, streams={stream: ">"}) |
| 5324 | assert_resp_response( |
| 5325 | r, |
| 5326 | res, |
| 5327 | [[strem_name, expected_entries]], |
| 5328 | {strem_name: [expected_entries]}, |
| 5329 | {strem_name: expected_entries}, |
| 5330 | ) |
| 5331 | |
| 5332 | await r.xgroup_destroy(stream, group) |
| 5333 | await r.xgroup_create(stream, group, 0) |
| 5334 | |
| 5335 | expected_entries = [await get_stream_message(r, stream, m1)] |
| 5336 | |
| 5337 | # xread with count=1 returns only the first message |
| 5338 | res = await r.xreadgroup(group, consumer, streams={stream: ">"}, count=1) |
| 5339 | assert_resp_response( |
| 5340 | r, |
| 5341 | res, |
| 5342 | [[strem_name, expected_entries]], |
| 5343 | {strem_name: [expected_entries]}, |
| 5344 | {strem_name: expected_entries}, |
| 5345 | ) |
| 5346 | |
| 5347 | await r.xgroup_destroy(stream, group) |
| 5348 | |
| 5349 | # create the group using $ as the last id meaning subsequent reads |
| 5350 | # will only find messages added after this |
| 5351 | await r.xgroup_create(stream, group, "$") |
| 5352 | |
| 5353 | # xread starting after the last message returns an empty message list |
| 5354 | res = await r.xreadgroup(group, consumer, streams={stream: ">"}) |
| 5355 | assert_resp_response(r, res, [], {}) |
| 5356 | |
| 5357 | # xreadgroup with noack does not have any items in the PEL |
| 5358 | await r.xgroup_destroy(stream, group) |
| 5359 | await r.xgroup_create(stream, group, "0") |
| 5360 | res = await r.xreadgroup(group, consumer, streams={stream: ">"}, noack=True) |
| 5361 | empty_res = await r.xreadgroup(group, consumer, streams={stream: "0"}) |
| 5362 | shape = expected_response_shape(r) |
| 5363 | if shape == "legacy_resp2": |
| 5364 | assert len(res[0][1]) == 2 |
| 5365 | # now there should be nothing pending |
nothing calls this directly
no test coverage detected