MCPcopy
hub / github.com/redis/redis-py / test_xnack_silent

Method test_xnack_silent

tests/test_asyncio/test_commands.py:5081–5090  ·  view source on GitHub ↗
(self, r: ClientT)

Source from the content-addressed store, hash-verified

5079
5080 @skip_if_server_version_lt("8.7.2")
5081 async def test_xnack_silent(self, r: ClientT):
5082 stream = "stream"
5083 group = "group"
5084 consumer = "consumer"
5085 m1 = await r.xadd(stream, {"foo": "bar"})
5086 m2 = await r.xadd(stream, {"foo": "bar"})
5087 await r.xgroup_create(stream, group, 0)
5088 await r.xreadgroup(group, consumer, streams={stream: ">"})
5089 result = await r.xnack(stream, group, "SILENT", m1, m2)
5090 assert result == 2
5091
5092 @skip_if_server_version_lt("8.7.2")
5093 async def test_xnack_fail(self, r: ClientT):

Callers

nothing calls this directly

Calls 4

xaddMethod · 0.80
xgroup_createMethod · 0.80
xreadgroupMethod · 0.80
xnackMethod · 0.80

Tested by

no test coverage detected