MCPcopy
hub / github.com/redis/redis-py / test_xgroup_create_entriesread

Method test_xgroup_create_entriesread

tests/test_commands.py:6577–6596  ·  view source on GitHub ↗
(self, r: redis.Redis)

Source from the content-addressed store, hash-verified

6575 @skip_if_server_version_lt("7.0.0")
6576 @skip_if_server_version_gte("8.2.2")
6577 def test_xgroup_create_entriesread(self, r: redis.Redis):
6578 stream = "stream"
6579 group = "group"
6580 r.xadd(stream, {"foo": "bar"})
6581
6582 # no group is setup yet, no info to obtain
6583 assert r.xinfo_groups(stream) == []
6584
6585 assert r.xgroup_create(stream, group, 0, entries_read=7)
6586 expected = [
6587 {
6588 "name": group.encode(),
6589 "consumers": 0,
6590 "pending": 0,
6591 "last-delivered-id": b"0-0",
6592 "entries-read": 7,
6593 "lag": 1,
6594 }
6595 ]
6596 assert r.xinfo_groups(stream) == expected
6597
6598 @skip_if_server_version_lt("8.2.2")
6599 def test_xgroup_create_entriesread_fixed_max_entries_read(self, r: redis.Redis):

Callers

nothing calls this directly

Calls 4

xaddMethod · 0.80
xinfo_groupsMethod · 0.80
xgroup_createMethod · 0.80
encodeMethod · 0.80

Tested by

no test coverage detected