MCPcopy
hub / github.com/redis/redis-py / test_xinfo_stream_full

Method test_xinfo_stream_full

tests/test_commands.py:6763–6785  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6761
6762 @skip_if_server_version_lt("6.0.0")
6763 def test_xinfo_stream_full(self, r):
6764 stream = "stream"
6765 group = "group"
6766 m1 = r.xadd(stream, {"foo": "bar"})
6767 info = r.xinfo_stream(stream, full=True)
6768 assert info["length"] == 1
6769 assert len(info["groups"]) == 0
6770
6771 r.xgroup_create(stream, group, 0)
6772 info = r.xinfo_stream(stream, full=True)
6773 assert info["length"] == 1
6774 assert_resp_response_in(
6775 r,
6776 m1,
6777 info["entries"],
6778 info["entries"].keys(),
6779 )
6780 assert len(info["groups"]) == 1
6781
6782 r.xreadgroup(group, "consumer", streams={stream: ">"})
6783 info = r.xinfo_stream(stream, full=True)
6784 consumer = info["groups"][0]["consumers"][0]
6785 assert isinstance(consumer, dict)
6786
6787 @skip_if_server_version_lt("8.5.0")
6788 def test_xinfo_stream_idempotent_fields(self, r):

Callers

nothing calls this directly

Calls 6

assert_resp_response_inFunction · 0.85
xaddMethod · 0.80
xinfo_streamMethod · 0.80
xgroup_createMethod · 0.80
keysMethod · 0.80
xreadgroupMethod · 0.80

Tested by

no test coverage detected