MCPcopy
hub / github.com/redis/redis-py / test_xadd_idmpauto

Method test_xadd_idmpauto

tests/test_commands.py:7750–7769  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

7748
7749 @skip_if_server_version_lt("8.5.0")
7750 def test_xadd_idmpauto(self, r):
7751 stream = "stream"
7752
7753 # XADD with IDMPAUTO - first write
7754 message_id1 = r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
7755
7756 # Test XADD with IDMPAUTO - duplicate write returns same ID
7757 message_id2 = r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
7758 assert message_id1 == message_id2
7759
7760 # Test XADD with IDMPAUTO - different content creates new entry
7761 message_id3 = r.xadd(stream, {"field1": "value2"}, idmpauto="producer1")
7762 assert message_id3 != message_id1
7763
7764 # Test XADD with IDMPAUTO - different producer creates new entry
7765 message_id4 = r.xadd(stream, {"field1": "value1"}, idmpauto="producer2")
7766 assert message_id4 != message_id1
7767
7768 # Verify stream has 3 entries (2 unique from producer1, 1 from producer2)
7769 assert r.xlen(stream) == 3
7770
7771 @skip_if_server_version_lt("8.5.0")
7772 def test_xadd_idmp(self, r):

Callers

nothing calls this directly

Calls 2

xaddMethod · 0.80
xlenMethod · 0.80

Tested by

no test coverage detected