MCPcopy
hub / github.com/redis/redis-py / test_xadd_with_options

Method test_xadd_with_options

tests/test_commands.py:7722–7747  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

7720
7721 @skip_if_server_version_lt("8.1.224")
7722 def test_xadd_with_options(self, r):
7723 stream = "stream"
7724
7725 # Test XADD with KEEPREF ref_policy
7726 r.xadd(
7727 stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
7728 )
7729 r.xadd(
7730 stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
7731 )
7732 r.xadd(
7733 stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="KEEPREF"
7734 )
7735 assert r.xlen(stream) == 2
7736
7737 # Test XADD with DELREF ref_policy
7738 r.xadd(stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="DELREF")
7739 assert r.xlen(stream) == 2
7740
7741 # Test XADD with ACKED ref_policy
7742 r.xadd(stream, {"foo": "bar"}, maxlen=2, approximate=False, ref_policy="ACKED")
7743 assert r.xlen(stream) == 2
7744
7745 # Test error case
7746 with pytest.raises(redis.DataError):
7747 r.xadd(stream, {"foo": "bar"}, ref_policy="INVALID")
7748
7749 @skip_if_server_version_lt("8.5.0")
7750 def test_xadd_idmpauto(self, r):

Callers

nothing calls this directly

Calls 2

xaddMethod · 0.80
xlenMethod · 0.80

Tested by

no test coverage detected