MCPcopy
hub / github.com/redis/redis-py / test_xreadgroup

Method test_xreadgroup

tests/test_asyncio/test_commands.py:5308–5389  ·  view source on GitHub ↗
(self, r: redis.Redis)

Source from the content-addressed store, hash-verified

5306
5307 @skip_if_server_version_lt("5.0.0")
5308 async def test_xreadgroup(self, r: redis.Redis):
5309 stream = "stream"
5310 group = "group"
5311 consumer = "consumer"
5312 m1 = await r.xadd(stream, {"foo": "bar"})
5313 m2 = await r.xadd(stream, {"bing": "baz"})
5314 await r.xgroup_create(stream, group, 0)
5315
5316 strem_name = stream.encode()
5317 expected_entries = [
5318 await get_stream_message(r, stream, m1),
5319 await get_stream_message(r, stream, m2),
5320 ]
5321
5322 # xread starting at 0 returns both messages
5323 res = await r.xreadgroup(group, consumer, streams={stream: ">"})
5324 assert_resp_response(
5325 r,
5326 res,
5327 [[strem_name, expected_entries]],
5328 {strem_name: [expected_entries]},
5329 {strem_name: expected_entries},
5330 )
5331
5332 await r.xgroup_destroy(stream, group)
5333 await r.xgroup_create(stream, group, 0)
5334
5335 expected_entries = [await get_stream_message(r, stream, m1)]
5336
5337 # xread with count=1 returns only the first message
5338 res = await r.xreadgroup(group, consumer, streams={stream: ">"}, count=1)
5339 assert_resp_response(
5340 r,
5341 res,
5342 [[strem_name, expected_entries]],
5343 {strem_name: [expected_entries]},
5344 {strem_name: expected_entries},
5345 )
5346
5347 await r.xgroup_destroy(stream, group)
5348
5349 # create the group using $ as the last id meaning subsequent reads
5350 # will only find messages added after this
5351 await r.xgroup_create(stream, group, "$")
5352
5353 # xread starting after the last message returns an empty message list
5354 res = await r.xreadgroup(group, consumer, streams={stream: ">"})
5355 assert_resp_response(r, res, [], {})
5356
5357 # xreadgroup with noack does not have any items in the PEL
5358 await r.xgroup_destroy(stream, group)
5359 await r.xgroup_create(stream, group, "0")
5360 res = await r.xreadgroup(group, consumer, streams={stream: ">"}, noack=True)
5361 empty_res = await r.xreadgroup(group, consumer, streams={stream: "0"})
5362 shape = expected_response_shape(r)
5363 if shape == "legacy_resp2":
5364 assert len(res[0][1]) == 2
5365 # now there should be nothing pending

Callers

nothing calls this directly

Calls 9

assert_resp_responseFunction · 0.90
expected_response_shapeFunction · 0.90
xaddMethod · 0.80
xgroup_createMethod · 0.80
encodeMethod · 0.80
xreadgroupMethod · 0.80
xgroup_destroyMethod · 0.80
xtrimMethod · 0.80
get_stream_messageFunction · 0.70

Tested by

no test coverage detected