MCPcopy
hub / github.com/redis/redis-py / test_xreadgroup

Method test_xreadgroup

tests/test_commands.py:7129–7208  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

7127
7128 @skip_if_server_version_lt("5.0.0")
7129 def test_xreadgroup(self, r):
7130 stream = "stream"
7131 group = "group"
7132 consumer = "consumer"
7133 m1 = r.xadd(stream, {"foo": "bar"})
7134 m2 = r.xadd(stream, {"bing": "baz"})
7135 r.xgroup_create(stream, group, 0)
7136
7137 stream_name = stream.encode()
7138 expected_entries = [
7139 get_stream_message(r, stream, m1),
7140 get_stream_message(r, stream, m2),
7141 ]
7142
7143 # xread starting at 0 returns both messages
7144 assert_resp_response(
7145 r,
7146 r.xreadgroup(group, consumer, streams={stream: ">"}),
7147 [[stream_name, expected_entries]],
7148 {stream_name: [expected_entries]},
7149 {stream_name: expected_entries},
7150 )
7151
7152 r.xgroup_destroy(stream, group)
7153 r.xgroup_create(stream, group, 0)
7154
7155 expected_entries = [get_stream_message(r, stream, m1)]
7156
7157 # xread with count=1 returns only the first message
7158 assert_resp_response(
7159 r,
7160 r.xreadgroup(group, consumer, streams={stream: ">"}, count=1),
7161 [[stream_name, expected_entries]],
7162 {stream_name: [expected_entries]},
7163 {stream_name: expected_entries},
7164 )
7165
7166 r.xgroup_destroy(stream, group)
7167
7168 # create the group using $ as the last id meaning subsequent reads
7169 # will only find messages added after this
7170 r.xgroup_create(stream, group, "$")
7171
7172 # xread starting after the last message returns an empty message list
7173 assert_resp_response(
7174 r, r.xreadgroup(group, consumer, streams={stream: ">"}), [], {}
7175 )
7176
7177 # xreadgroup with noack does not have any items in the PEL
7178 r.xgroup_destroy(stream, group)
7179 r.xgroup_create(stream, group, "0")
7180 res = r.xreadgroup(group, consumer, streams={stream: ">"}, noack=True)
7181 empty_res = r.xreadgroup(group, consumer, streams={stream: "0"})
7182 shape = expected_response_shape(r)
7183 if shape == "legacy_resp2":
7184 assert len(res[0][1]) == 2
7185 # now there should be nothing pending
7186 assert len(empty_res[0][1]) == 0

Callers

nothing calls this directly

Calls 9

assert_resp_responseFunction · 0.85
expected_response_shapeFunction · 0.85
xaddMethod · 0.80
xgroup_createMethod · 0.80
encodeMethod · 0.80
xreadgroupMethod · 0.80
xgroup_destroyMethod · 0.80
xtrimMethod · 0.80
get_stream_messageFunction · 0.70

Tested by

no test coverage detected