(self, r)
| 6921 | |
| 6922 | @skip_if_server_version_lt("8.7.2") |
| 6923 | def test_xnack_force(self, r): |
| 6924 | stream = "stream" |
| 6925 | group = "group" |
| 6926 | m1 = r.xadd(stream, {"foo": "bar"}) |
| 6927 | r.xgroup_create(stream, group, 0) |
| 6928 | # FORCE creates unowned PEL entries for IDs not already in PEL |
| 6929 | result = r.xnack(stream, group, "FAIL", m1, force=True) |
| 6930 | assert result == 1 |
| 6931 | |
| 6932 | @skip_if_server_version_lt("8.7.2") |
| 6933 | def test_xnack_invalid_mode(self, r): |
nothing calls this directly
no test coverage detected