MCPcopy
hub / github.com/redis/redis-py / test_xadd_idmpauto

Method test_xadd_idmpauto

tests/test_asyncio/test_commands.py:5939–5958  ·  view source on GitHub ↗
(self, r: redis.Redis)

Source from the content-addressed store, hash-verified

5937
5938 @skip_if_server_version_lt("8.5.0")
5939 async def test_xadd_idmpauto(self, r: redis.Redis):
5940 stream = "stream"
5941
5942 # XADD with IDMPAUTO - first write
5943 message_id1 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
5944
5945 # Test XADD with IDMPAUTO - duplicate write returns same ID
5946 message_id2 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer1")
5947 assert message_id1 == message_id2
5948
5949 # Test XADD with IDMPAUTO - different content creates new entry
5950 message_id3 = await r.xadd(stream, {"field1": "value2"}, idmpauto="producer1")
5951 assert message_id3 != message_id1
5952
5953 # Test XADD with IDMPAUTO - different producer creates new entry
5954 message_id4 = await r.xadd(stream, {"field1": "value1"}, idmpauto="producer2")
5955 assert message_id4 != message_id1
5956
5957 # Verify stream has 3 entries (2 unique from producer1, 1 from producer2)
5958 assert await r.xlen(stream) == 3
5959
5960 @skip_if_server_version_lt("8.5.0")
5961 async def test_xadd_idmp(self, r: redis.Redis):

Callers

nothing calls this directly

Calls 2

xaddMethod · 0.80
xlenMethod · 0.80

Tested by

no test coverage detected