MCPcopy
hub / github.com/segmentio/kafka-go / testConsumerGroupFetchOffsets

Function testConsumerGroupFetchOffsets

client_test.go:130–192  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, client *Client)

Source from the content-addressed store, hash-verified

128}
129
130func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, client *Client) {
131 const totalMessages = 144
132 const partitions = 12
133 const msgPerPartition = totalMessages / partitions
134
135 topic := makeTopic()
136 if err := clientCreateTopic(client, topic, partitions); err != nil {
137 t.Fatal(err)
138 }
139
140 groupId := makeGroupID()
141 brokers := []string{"localhost:9092"}
142
143 writer := &Writer{
144 Addr: TCP(brokers...),
145 Topic: topic,
146 Balancer: &RoundRobin{},
147 BatchSize: 1,
148 Transport: client.Transport,
149 }
150 if err := writer.WriteMessages(ctx, makeTestSequence(totalMessages)...); err != nil {
151 t.Fatalf("bad write messages: %v", err)
152 }
153 if err := writer.Close(); err != nil {
154 t.Fatalf("bad write err: %v", err)
155 }
156
157 r := NewReader(ReaderConfig{
158 Brokers: brokers,
159 Topic: topic,
160 GroupID: groupId,
161 MinBytes: 1,
162 MaxBytes: 10e6,
163 MaxWait: 100 * time.Millisecond,
164 })
165 defer r.Close()
166
167 for i := 0; i < totalMessages; i++ {
168 m, err := r.FetchMessage(ctx)
169 if err != nil {
170 t.Fatalf("error fetching message: %s", err)
171 }
172 if err := r.CommitMessages(context.Background(), m); err != nil {
173 t.Fatal(err)
174 }
175 }
176
177 offsets, err := client.ConsumerOffsets(ctx, TopicAndGroup{GroupId: groupId, Topic: topic})
178 if err != nil {
179 t.Fatal(err)
180 }
181
182 if len(offsets) != partitions {
183 t.Fatalf("expected %d partitions but only received offsets for %d", partitions, len(offsets))
184 }
185
186 for i := 0; i < partitions; i++ {
187 committedOffset := offsets[i]

Callers

nothing calls this directly

Calls 12

WriteMessagesMethod · 0.95
CloseMethod · 0.95
CloseMethod · 0.95
FetchMessageMethod · 0.95
CommitMessagesMethod · 0.95
makeGroupIDFunction · 0.85
TCPFunction · 0.85
makeTestSequenceFunction · 0.85
NewReaderFunction · 0.85
ConsumerOffsetsMethod · 0.80
makeTopicFunction · 0.70
clientCreateTopicFunction · 0.70

Tested by

no test coverage detected