MCPcopy
hub / github.com/segmentio/kafka-go / BenchmarkReader

Function BenchmarkReader

reader_test.go:533–579  ·  view source on GitHub ↗
(b *testing.B)

Source from the content-addressed store, hash-verified

531)
532
533func BenchmarkReader(b *testing.B) {
534 const broker = "localhost:9092"
535 ctx := context.Background()
536
537 benchmarkReaderOnce.Do(func() {
538 conn, err := DialLeader(ctx, "tcp", broker, benchmarkReaderTopic, 0)
539 if err != nil {
540 b.Fatal(err)
541 }
542 defer conn.Close()
543
544 msgs := make([]Message, 1000)
545 for i := range msgs {
546 msgs[i].Value = benchmarkReaderPayload
547 }
548
549 for i := 0; i != 10; i++ { // put 10K messages
550 if _, err := conn.WriteMessages(msgs...); err != nil {
551 b.Fatal(err)
552 }
553 }
554
555 b.ResetTimer()
556 })
557
558 r := NewReader(ReaderConfig{
559 Brokers: []string{broker},
560 Topic: benchmarkReaderTopic,
561 Partition: 0,
562 MinBytes: 1e3,
563 MaxBytes: 1e6,
564 MaxWait: 100 * time.Millisecond,
565 })
566
567 for i := 0; i < b.N; i++ {
568 if (i % 10000) == 0 {
569 r.SetOffset(-1)
570 }
571 _, err := r.ReadMessage(ctx)
572 if err != nil {
573 b.Fatal(err)
574 }
575 }
576
577 r.Close()
578 b.SetBytes(int64(len(benchmarkReaderPayload)))
579}
580
581func TestCloseLeavesGroup(t *testing.T) {
582 if os.Getenv("KAFKA_VERSION") == "2.3.1" {

Callers

nothing calls this directly

Calls 7

SetOffsetMethod · 0.95
ReadMessageMethod · 0.95
CloseMethod · 0.95
DialLeaderFunction · 0.85
NewReaderFunction · 0.85
CloseMethod · 0.45
WriteMessagesMethod · 0.45

Tested by

no test coverage detected