(self, r: redis.Redis)
| 4756 | |
| 4757 | @skip_if_server_version_lt("5.0.0") |
| 4758 | async def test_xclaim(self, r: redis.Redis): |
| 4759 | stream = "stream" |
| 4760 | group = "group" |
| 4761 | consumer1 = "consumer1" |
| 4762 | consumer2 = "consumer2" |
| 4763 | |
| 4764 | message_id = await r.xadd(stream, {"john": "wick"}) |
| 4765 | message = await get_stream_message(r, stream, message_id) |
| 4766 | await r.xgroup_create(stream, group, 0) |
| 4767 | |
| 4768 | # trying to claim a message that isn't already pending doesn't |
| 4769 | # do anything |
| 4770 | response = await r.xclaim( |
| 4771 | stream, group, consumer2, min_idle_time=0, message_ids=(message_id,) |
| 4772 | ) |
| 4773 | assert response == [] |
| 4774 | |
| 4775 | # read the group as consumer1 to initially claim the messages |
| 4776 | await r.xreadgroup(group, consumer1, streams={stream: ">"}) |
| 4777 | |
| 4778 | # claim the message as consumer2 |
| 4779 | response = await r.xclaim( |
| 4780 | stream, group, consumer2, min_idle_time=0, message_ids=(message_id,) |
| 4781 | ) |
| 4782 | assert response[0] == message |
| 4783 | |
| 4784 | # reclaim the message as consumer1, but use the justid argument |
| 4785 | # which only returns message ids |
| 4786 | assert await r.xclaim( |
| 4787 | stream, |
| 4788 | group, |
| 4789 | consumer1, |
| 4790 | min_idle_time=0, |
| 4791 | message_ids=(message_id,), |
| 4792 | justid=True, |
| 4793 | ) == [message_id] |
| 4794 | |
| 4795 | @skip_if_server_version_lt("7.0.0") |
| 4796 | async def test_xclaim_trimmed(self, r: redis.Redis): |
nothing calls this directly
no test coverage detected