MCPcopy
hub / github.com/redis/redis-py / test_xnack_fail

Method test_xnack_fail

tests/test_commands.py:6861–6870  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6859
6860 @skip_if_server_version_lt("8.7.2")
6861 def test_xnack_fail(self, r):
6862 stream = "stream"
6863 group = "group"
6864 consumer = "consumer"
6865 m1 = r.xadd(stream, {"foo": "bar"})
6866 r.xgroup_create(stream, group, 0)
6867 r.xreadgroup(group, consumer, streams={stream: ">"})
6868 # FAIL mode returns count of NACKed messages
6869 result = r.xnack(stream, group, "FAIL", m1)
6870 assert result == 1
6871
6872 @skip_if_server_version_lt("8.7.2")
6873 def test_xnack_fatal(self, r):

Callers

nothing calls this directly

Calls 4

xaddMethod · 0.80
xgroup_createMethod · 0.80
xreadgroupMethod · 0.80
xnackMethod · 0.80

Tested by

no test coverage detected