MCPcopy
hub / github.com/redis/redis-py / test_xack

Method test_xack

tests/test_commands.py:6308–6326  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6306
6307 @skip_if_server_version_lt("5.0.0")
6308 def test_xack(self, r):
6309 stream = "stream"
6310 group = "group"
6311 consumer = "consumer"
6312 # xack on a stream that doesn't exist
6313 assert r.xack(stream, group, "0-0") == 0
6314
6315 m1 = r.xadd(stream, {"one": "one"})
6316 m2 = r.xadd(stream, {"two": "two"})
6317 m3 = r.xadd(stream, {"three": "three"})
6318
6319 # xack on a group that doesn't exist
6320 assert r.xack(stream, group, m1) == 0
6321
6322 r.xgroup_create(stream, group, 0)
6323 r.xreadgroup(group, consumer, streams={stream: ">"})
6324 # xack returns the number of ack'd elements
6325 assert r.xack(stream, group, m1) == 1
6326 assert r.xack(stream, group, m2, m3) == 2
6327
6328 @skip_if_server_version_lt("5.0.0")
6329 def test_xadd(self, r):

Callers

nothing calls this directly

Calls 4

xackMethod · 0.80
xaddMethod · 0.80
xgroup_createMethod · 0.80
xreadgroupMethod · 0.80

Tested by

no test coverage detected