MCPcopy
hub / github.com/redis/redis-py / test_xadd_idmp

Method test_xadd_idmp

tests/test_commands.py:7772–7794  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

7770
7771 @skip_if_server_version_lt("8.5.0")
7772 def test_xadd_idmp(self, r):
7773 stream = "stream"
7774
7775 # XADD with IDMP - first write
7776 message_id1 = r.xadd(stream, {"field1": "value1"}, idmp=("producer1", b"msg1"))
7777
7778 # Test XADD with IDMP - duplicate write returns same ID
7779 message_id2 = r.xadd(stream, {"field1": "value1"}, idmp=("producer1", b"msg1"))
7780 assert message_id1 == message_id2
7781
7782 # Test XADD with IDMP - different iid creates new entry
7783 message_id3 = r.xadd(stream, {"field1": "value1"}, idmp=("producer1", b"msg2"))
7784 assert message_id3 != message_id1
7785
7786 # Test XADD with IDMP - different producer creates new entry
7787 message_id4 = r.xadd(stream, {"field1": "value1"}, idmp=("producer2", b"msg1"))
7788 assert message_id4 != message_id1
7789
7790 # Test XADD with IDMP - shorter binary iid
7791 r.xadd(stream, {"field1": "value1"}, idmp=("producer1", b"\x01"))
7792
7793 # Verify stream has 4 entries
7794 assert r.xlen(stream) == 4
7795
7796 @skip_if_server_version_lt("8.5.0")
7797 def test_xadd_idmp_validation(self, r):

Callers

nothing calls this directly

Calls 2

xaddMethod · 0.80
xlenMethod · 0.80

Tested by

no test coverage detected