MCPcopy
hub / github.com/redis/redis-py / test_xnack_force

Method test_xnack_force

tests/test_commands.py:6923–6930  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6921
6922 @skip_if_server_version_lt("8.7.2")
6923 def test_xnack_force(self, r):
6924 stream = "stream"
6925 group = "group"
6926 m1 = r.xadd(stream, {"foo": "bar"})
6927 r.xgroup_create(stream, group, 0)
6928 # FORCE creates unowned PEL entries for IDs not already in PEL
6929 result = r.xnack(stream, group, "FAIL", m1, force=True)
6930 assert result == 1
6931
6932 @skip_if_server_version_lt("8.7.2")
6933 def test_xnack_invalid_mode(self, r):

Callers

nothing calls this directly

Calls 3

xaddMethod · 0.80
xgroup_createMethod · 0.80
xnackMethod · 0.80

Tested by

no test coverage detected