MCPcopy
hub / github.com/redis/redis-py / test_xtrim

Method test_xtrim

tests/test_asyncio/test_commands.py:5770–5786  ·  view source on GitHub ↗
(self, r: redis.Redis)

Source from the content-addressed store, hash-verified

5768
5769 @skip_if_server_version_lt("5.0.0")
5770 async def test_xtrim(self, r: redis.Redis):
5771 stream = "stream"
5772
5773 # trimming an empty key doesn't do anything
5774 assert await r.xtrim(stream, 1000) == 0
5775
5776 await r.xadd(stream, {"foo": "bar"})
5777 await r.xadd(stream, {"foo": "bar"})
5778 await r.xadd(stream, {"foo": "bar"})
5779 await r.xadd(stream, {"foo": "bar"})
5780
5781 # trimming an amount large than the number of messages
5782 # doesn't do anything
5783 assert await r.xtrim(stream, 5, approximate=False) == 0
5784
5785 # 1 message is trimmed
5786 assert await r.xtrim(stream, 3, approximate=False) == 1
5787
5788 @skip_if_server_version_lt("8.1.224")
5789 async def test_xdelex(self, r: redis.Redis):

Callers

nothing calls this directly

Calls 2

xtrimMethod · 0.80
xaddMethod · 0.80

Tested by

no test coverage detected