MCPcopy
hub / github.com/redis/redis-py / test_xgroup_create_mkstream

Method test_xgroup_create_mkstream

tests/test_commands.py:6550–6573  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

6548
6549 @skip_if_server_version_lt("7.0.0")
6550 def test_xgroup_create_mkstream(self, r):
6551 # tests xgroup_create and xinfo_groups
6552 stream = "stream"
6553 group = "group"
6554
6555 # an error is raised if a group is created on a stream that
6556 # doesn't already exist
6557 with pytest.raises(exceptions.ResponseError):
6558 r.xgroup_create(stream, group, 0)
6559
6560 # however, with mkstream=True, the underlying stream is created
6561 # automatically
6562 assert r.xgroup_create(stream, group, 0, mkstream=True)
6563 expected = [
6564 {
6565 "name": group.encode(),
6566 "consumers": 0,
6567 "pending": 0,
6568 "last-delivered-id": b"0-0",
6569 "entries-read": None,
6570 "lag": 0,
6571 }
6572 ]
6573 assert r.xinfo_groups(stream) == expected
6574
6575 @skip_if_server_version_lt("7.0.0")
6576 @skip_if_server_version_gte("8.2.2")

Callers

nothing calls this directly

Calls 3

xgroup_createMethod · 0.80
encodeMethod · 0.80
xinfo_groupsMethod · 0.80

Tested by

no test coverage detected