MCPcopy
hub / github.com/redis/redis-py / test_xclaim

Method test_xclaim

tests/test_asyncio/test_commands.py:4758–4793  ·  view source on GitHub ↗
(self, r: redis.Redis)

Source from the content-addressed store, hash-verified

4756
4757 @skip_if_server_version_lt("5.0.0")
4758 async def test_xclaim(self, r: redis.Redis):
4759 stream = "stream"
4760 group = "group"
4761 consumer1 = "consumer1"
4762 consumer2 = "consumer2"
4763
4764 message_id = await r.xadd(stream, {"john": "wick"})
4765 message = await get_stream_message(r, stream, message_id)
4766 await r.xgroup_create(stream, group, 0)
4767
4768 # trying to claim a message that isn't already pending doesn't
4769 # do anything
4770 response = await r.xclaim(
4771 stream, group, consumer2, min_idle_time=0, message_ids=(message_id,)
4772 )
4773 assert response == []
4774
4775 # read the group as consumer1 to initially claim the messages
4776 await r.xreadgroup(group, consumer1, streams={stream: ">"})
4777
4778 # claim the message as consumer2
4779 response = await r.xclaim(
4780 stream, group, consumer2, min_idle_time=0, message_ids=(message_id,)
4781 )
4782 assert response[0] == message
4783
4784 # reclaim the message as consumer1, but use the justid argument
4785 # which only returns message ids
4786 assert await r.xclaim(
4787 stream,
4788 group,
4789 consumer1,
4790 min_idle_time=0,
4791 message_ids=(message_id,),
4792 justid=True,
4793 ) == [message_id]
4794
4795 @skip_if_server_version_lt("7.0.0")
4796 async def test_xclaim_trimmed(self, r: redis.Redis):

Callers

nothing calls this directly

Calls 5

xaddMethod · 0.80
xgroup_createMethod · 0.80
xclaimMethod · 0.80
xreadgroupMethod · 0.80
get_stream_messageFunction · 0.70

Tested by

no test coverage detected