MCPcopy
hub / github.com/redis/redis-py / test_xinfo_consumers

Method test_xinfo_consumers

tests/test_commands.py:6716–6740  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6714
6715 @skip_if_server_version_lt("7.2.0")
6716 def test_xinfo_consumers(self, r):
6717 stream = "stream"
6718 group = "group"
6719 consumer1 = "consumer1"
6720 consumer2 = "consumer2"
6721 r.xadd(stream, {"foo": "bar"})
6722 r.xadd(stream, {"foo": "bar"})
6723 r.xadd(stream, {"foo": "bar"})
6724
6725 r.xgroup_create(stream, group, 0)
6726 r.xreadgroup(group, consumer1, streams={stream: ">"}, count=1)
6727 r.xreadgroup(group, consumer2, streams={stream: ">"})
6728 info = r.xinfo_consumers(stream, group)
6729 assert len(info) == 2
6730 expected = [
6731 {"name": consumer1.encode(), "pending": 1},
6732 {"name": consumer2.encode(), "pending": 2},
6733 ]
6734
6735 # we can't determine the idle and inactive time, so just make sure it's an int
6736 assert isinstance(info[0].pop("idle"), int)
6737 assert isinstance(info[1].pop("idle"), int)
6738 assert isinstance(info[0].pop("inactive"), int)
6739 assert isinstance(info[1].pop("inactive"), int)
6740 assert info == expected
6741
6742 @skip_if_server_version_lt("7.0.0")
6743 def test_xinfo_stream(self, r):

Callers

nothing calls this directly

Calls 5

xaddMethod · 0.80
xgroup_createMethod · 0.80
xreadgroupMethod · 0.80
xinfo_consumersMethod · 0.80
encodeMethod · 0.80

Tested by

no test coverage detected