MCPcopy
hub / github.com/redis/redis-py / test_xadd_idmp

Method test_xadd_idmp

tests/test_asyncio/test_commands.py:5961–5991  ·  view source on GitHub ↗
(self, r: redis.Redis)

Source from the content-addressed store, hash-verified

5959
5960 @skip_if_server_version_lt("8.5.0")
5961 async def test_xadd_idmp(self, r: redis.Redis):
5962 stream = "stream"
5963
5964 # Test XADD with IDMP - first write
5965 message_id1 = await r.xadd(
5966 stream, {"field1": "value1"}, idmp=("producer1", b"msg1")
5967 )
5968
5969 # Test XADD with IDMP - duplicate write returns same ID
5970 message_id2 = await r.xadd(
5971 stream, {"field1": "value1"}, idmp=("producer1", b"msg1")
5972 )
5973 assert message_id1 == message_id2
5974
5975 # Test XADD with IDMP - different iid creates new entry
5976 message_id3 = await r.xadd(
5977 stream, {"field1": "value1"}, idmp=("producer1", b"msg2")
5978 )
5979 assert message_id3 != message_id1
5980
5981 # Test XADD with IDMP - different producer creates new entry
5982 message_id4 = await r.xadd(
5983 stream, {"field1": "value1"}, idmp=("producer2", b"msg1")
5984 )
5985 assert message_id4 != message_id1
5986
5987 # Test XADD with IDMP - shorter binary iid
5988 await r.xadd(stream, {"field1": "value1"}, idmp=("producer1", b"\x01"))
5989
5990 # Verify stream has 4 entries
5991 assert await r.xlen(stream) == 4
5992
5993 @skip_if_server_version_lt("8.5.0")
5994 async def test_xadd_idmp_validation(self, r: redis.Redis):

Callers

nothing calls this directly

Calls 2

xaddMethod · 0.80
xlenMethod · 0.80

Tested by

no test coverage detected