MCPcopy
hub / github.com/redis/redis-py / test_xautoclaim

Method test_xautoclaim

tests/test_commands.py:6406–6436  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6404
6405 @skip_if_server_version_lt("7.0.0")
6406 def test_xautoclaim(self, r):
6407 stream = "stream"
6408 group = "group"
6409 consumer1 = "consumer1"
6410 consumer2 = "consumer2"
6411
6412 message_id1 = r.xadd(stream, {"john": "wick"})
6413 message_id2 = r.xadd(stream, {"johny": "deff"})
6414 message = get_stream_message(r, stream, message_id1)
6415 r.xgroup_create(stream, group, 0)
6416
6417 # trying to claim a message that isn't already pending doesn't
6418 # do anything
6419 response = r.xautoclaim(stream, group, consumer2, min_idle_time=0)
6420 assert response == [b"0-0", [], []]
6421
6422 # read the group as consumer1 to initially claim the messages
6423 r.xreadgroup(group, consumer1, streams={stream: ">"})
6424
6425 # claim one message as consumer2
6426 response = r.xautoclaim(stream, group, consumer2, min_idle_time=0, count=1)
6427 assert response[1] == [message]
6428
6429 # reclaim the messages as consumer1, but use the justid argument
6430 # which only returns message ids
6431 assert r.xautoclaim(
6432 stream, group, consumer1, min_idle_time=0, start_id=0, justid=True
6433 ) == [message_id1, message_id2]
6434 assert r.xautoclaim(
6435 stream, group, consumer1, min_idle_time=0, start_id=message_id2, justid=True
6436 ) == [message_id2]
6437
6438 @skip_if_server_version_lt("6.2.0")
6439 def test_xautoclaim_negative(self, r):

Callers

nothing calls this directly

Calls 5

xaddMethod · 0.80
xgroup_createMethod · 0.80
xautoclaimMethod · 0.80
xreadgroupMethod · 0.80
get_stream_messageFunction · 0.70

Tested by

no test coverage detected