(self, r)
| 7127 | |
| 7128 | @skip_if_server_version_lt("5.0.0") |
| 7129 | def test_xreadgroup(self, r): |
| 7130 | stream = "stream" |
| 7131 | group = "group" |
| 7132 | consumer = "consumer" |
| 7133 | m1 = r.xadd(stream, {"foo": "bar"}) |
| 7134 | m2 = r.xadd(stream, {"bing": "baz"}) |
| 7135 | r.xgroup_create(stream, group, 0) |
| 7136 | |
| 7137 | stream_name = stream.encode() |
| 7138 | expected_entries = [ |
| 7139 | get_stream_message(r, stream, m1), |
| 7140 | get_stream_message(r, stream, m2), |
| 7141 | ] |
| 7142 | |
| 7143 | # xread starting at 0 returns both messages |
| 7144 | assert_resp_response( |
| 7145 | r, |
| 7146 | r.xreadgroup(group, consumer, streams={stream: ">"}), |
| 7147 | [[stream_name, expected_entries]], |
| 7148 | {stream_name: [expected_entries]}, |
| 7149 | {stream_name: expected_entries}, |
| 7150 | ) |
| 7151 | |
| 7152 | r.xgroup_destroy(stream, group) |
| 7153 | r.xgroup_create(stream, group, 0) |
| 7154 | |
| 7155 | expected_entries = [get_stream_message(r, stream, m1)] |
| 7156 | |
| 7157 | # xread with count=1 returns only the first message |
| 7158 | assert_resp_response( |
| 7159 | r, |
| 7160 | r.xreadgroup(group, consumer, streams={stream: ">"}, count=1), |
| 7161 | [[stream_name, expected_entries]], |
| 7162 | {stream_name: [expected_entries]}, |
| 7163 | {stream_name: expected_entries}, |
| 7164 | ) |
| 7165 | |
| 7166 | r.xgroup_destroy(stream, group) |
| 7167 | |
| 7168 | # create the group using $ as the last id meaning subsequent reads |
| 7169 | # will only find messages added after this |
| 7170 | r.xgroup_create(stream, group, "$") |
| 7171 | |
| 7172 | # xread starting after the last message returns an empty message list |
| 7173 | assert_resp_response( |
| 7174 | r, r.xreadgroup(group, consumer, streams={stream: ">"}), [], {} |
| 7175 | ) |
| 7176 | |
| 7177 | # xreadgroup with noack does not have any items in the PEL |
| 7178 | r.xgroup_destroy(stream, group) |
| 7179 | r.xgroup_create(stream, group, "0") |
| 7180 | res = r.xreadgroup(group, consumer, streams={stream: ">"}, noack=True) |
| 7181 | empty_res = r.xreadgroup(group, consumer, streams={stream: "0"}) |
| 7182 | shape = expected_response_shape(r) |
| 7183 | if shape == "legacy_resp2": |
| 7184 | assert len(res[0][1]) == 2 |
| 7185 | # now there should be nothing pending |
| 7186 | assert len(empty_res[0][1]) == 0 |
nothing calls this directly
no test coverage detected