MCPcopy
hub / github.com/IBM/sarama / NewOffsetFetchRequest

Function NewOffsetFetchRequest

offset_fetch_request.go:29–70  ·  view source on GitHub ↗
(
	version KafkaVersion,
	group string,
	partitions map[string][]int32,
)

Source from the content-addressed store, hash-verified

27}
28
29func NewOffsetFetchRequest(
30 version KafkaVersion,
31 group string,
32 partitions map[string][]int32,
33) *OffsetFetchRequest {
34 request := &OffsetFetchRequest{
35 ConsumerGroup: group,
36 partitions: partitions,
37 }
38 if version.IsAtLeast(V3_0_0_0) {
39 // Version 8 is adding support for fetching offsets for multiple groups at a time.
40 request.Version = 8
41 request.Groups = []OffsetFetchRequestGroup{{GroupId: group, Partitions: partitions}}
42 return request
43 }
44 if version.IsAtLeast(V2_5_0_0) {
45 // Version 7 is adding the require stable flag.
46 request.Version = 7
47 } else if version.IsAtLeast(V2_4_0_0) {
48 // Version 6 is the first flexible version.
49 request.Version = 6
50 } else if version.IsAtLeast(V2_1_0_0) {
51 // Version 3, 4, and 5 are the same as version 2.
52 request.Version = 5
53 } else if version.IsAtLeast(V2_0_0_0) {
54 request.Version = 4
55 } else if version.IsAtLeast(V0_11_0_0) {
56 request.Version = 3
57 } else if version.IsAtLeast(V0_10_2_0) {
58 // Starting in version 2, the request can contain a null topics array to indicate that offsets
59 // for all topics should be fetched. It also returns a top level error code
60 // for group or coordinator level errors.
61 request.Version = 2
62 } else if version.IsAtLeast(V0_8_2_0) {
63 // In version 0, the request read offsets from ZK.
64 //
65 // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
66 request.Version = 1
67 }
68
69 return request
70}
71
72func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
73 if r.Version < 0 || r.Version > 8 {

Calls 1

IsAtLeastMethod · 0.80

Tested by 1