MCPcopy
hub / github.com/redis/redis-py / test_xclaim

Method test_xclaim

tests/test_commands.py:6451–6485  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6449
6450 @skip_if_server_version_lt("5.0.0")
6451 def test_xclaim(self, r):
6452 stream = "stream"
6453 group = "group"
6454 consumer1 = "consumer1"
6455 consumer2 = "consumer2"
6456 message_id = r.xadd(stream, {"john": "wick"})
6457 message = get_stream_message(r, stream, message_id)
6458 r.xgroup_create(stream, group, 0)
6459
6460 # trying to claim a message that isn't already pending doesn't
6461 # do anything
6462 response = r.xclaim(
6463 stream, group, consumer2, min_idle_time=0, message_ids=(message_id,)
6464 )
6465 assert response == []
6466
6467 # read the group as consumer1 to initially claim the messages
6468 r.xreadgroup(group, consumer1, streams={stream: ">"})
6469
6470 # claim the message as consumer2
6471 response = r.xclaim(
6472 stream, group, consumer2, min_idle_time=0, message_ids=(message_id,)
6473 )
6474 assert response[0] == message
6475
6476 # reclaim the message as consumer1, but use the justid argument
6477 # which only returns message ids
6478 assert r.xclaim(
6479 stream,
6480 group,
6481 consumer1,
6482 min_idle_time=0,
6483 message_ids=(message_id,),
6484 justid=True,
6485 ) == [message_id]
6486
6487 @skip_if_server_version_lt("7.0.0")
6488 def test_xclaim_trimmed(self, r):

Callers

nothing calls this directly

Calls 5

xaddMethod · 0.80
xgroup_createMethod · 0.80
xclaimMethod · 0.80
xreadgroupMethod · 0.80
get_stream_messageFunction · 0.70

Tested by

no test coverage detected