MCPcopy
hub / github.com/segmentio/kafka-go / TestClientPipeline

Function TestClientPipeline

fetch_test.go:209–285  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

207}
208
209func TestClientPipeline(t *testing.T) {
210 client, topic, shutdown := newLocalClientAndTopic()
211 defer shutdown()
212
213 const numBatches = 100
214 const recordsPerBatch = 30
215
216 unixEpoch := time.Unix(0, 0)
217 records := make([]Record, recordsPerBatch)
218 content := []byte("1234567890")
219
220 for i := 0; i < numBatches; i++ {
221 for j := range records {
222 records[j] = Record{Value: NewBytes(content)}
223 }
224
225 _, err := client.Produce(context.Background(), &ProduceRequest{
226 Topic: topic,
227 RequiredAcks: -1,
228 Records: NewRecordReader(records...),
229 Compression: Snappy,
230 })
231 if err != nil {
232 t.Fatal(err)
233 }
234 }
235
236 offset := int64(0)
237
238 for i := 0; i < (numBatches * recordsPerBatch); {
239 req := &FetchRequest{
240 Topic: topic,
241 Offset: offset,
242 MinBytes: 1,
243 MaxBytes: 8192,
244 MaxWait: 500 * time.Millisecond,
245 }
246
247 res, err := client.Fetch(context.Background(), req)
248 if err != nil {
249 t.Fatal(err)
250 }
251
252 if res.Error != nil {
253 t.Fatal(res.Error)
254 }
255
256 for {
257 r, err := res.Records.ReadRecord()
258 if err != nil {
259 if errors.Is(err, io.EOF) {
260 break
261 }
262 t.Fatal(err)
263 }
264
265 if r.Key != nil {
266 r.Key.Close()

Callers

nothing calls this directly

Calls 8

ProduceMethod · 0.80
FetchMethod · 0.80
IsZeroMethod · 0.80
newLocalClientAndTopicFunction · 0.70
NewBytesFunction · 0.70
NewRecordReaderFunction · 0.70
ReadRecordMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected