MCPcopy
hub / github.com/segmentio/kafka-go / prepareReader

Function prepareReader

reader_test.go:504–525  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, r *Reader, msgs ...Message)

Source from the content-addressed store, hash-verified

502}
503
504func prepareReader(t *testing.T, ctx context.Context, r *Reader, msgs ...Message) {
505 config := r.Config()
506 var conn *Conn
507 var err error
508
509 for {
510 if conn, err = DialLeader(ctx, "tcp", "localhost:9092", config.Topic, config.Partition); err == nil {
511 break
512 }
513 select {
514 case <-time.After(time.Second):
515 case <-ctx.Done():
516 t.Fatal(ctx.Err())
517 }
518 }
519
520 defer conn.Close()
521
522 if _, err := conn.WriteMessages(msgs...); err != nil {
523 t.Fatal(err)
524 }
525}
526
527var (
528 benchmarkReaderOnce sync.Once

Callers 15

testReaderReadMessagesFunction · 0.85
testReaderSetOffsetAtFunction · 0.85
testReaderLagFunction · 0.85
testReaderReadLagFunction · 0.85
TestCloseLeavesGroupFunction · 0.85

Calls 6

CloseMethod · 0.95
WriteMessagesMethod · 0.95
DialLeaderFunction · 0.85
ConfigMethod · 0.80
DoneMethod · 0.80
ErrMethod · 0.45

Tested by

no test coverage detected