(b *testing.B)
| 531 | ) |
| 532 | |
| 533 | func BenchmarkReader(b *testing.B) { |
| 534 | const broker = "localhost:9092" |
| 535 | ctx := context.Background() |
| 536 | |
| 537 | benchmarkReaderOnce.Do(func() { |
| 538 | conn, err := DialLeader(ctx, "tcp", broker, benchmarkReaderTopic, 0) |
| 539 | if err != nil { |
| 540 | b.Fatal(err) |
| 541 | } |
| 542 | defer conn.Close() |
| 543 | |
| 544 | msgs := make([]Message, 1000) |
| 545 | for i := range msgs { |
| 546 | msgs[i].Value = benchmarkReaderPayload |
| 547 | } |
| 548 | |
| 549 | for i := 0; i != 10; i++ { // put 10K messages |
| 550 | if _, err := conn.WriteMessages(msgs...); err != nil { |
| 551 | b.Fatal(err) |
| 552 | } |
| 553 | } |
| 554 | |
| 555 | b.ResetTimer() |
| 556 | }) |
| 557 | |
| 558 | r := NewReader(ReaderConfig{ |
| 559 | Brokers: []string{broker}, |
| 560 | Topic: benchmarkReaderTopic, |
| 561 | Partition: 0, |
| 562 | MinBytes: 1e3, |
| 563 | MaxBytes: 1e6, |
| 564 | MaxWait: 100 * time.Millisecond, |
| 565 | }) |
| 566 | |
| 567 | for i := 0; i < b.N; i++ { |
| 568 | if (i % 10000) == 0 { |
| 569 | r.SetOffset(-1) |
| 570 | } |
| 571 | _, err := r.ReadMessage(ctx) |
| 572 | if err != nil { |
| 573 | b.Fatal(err) |
| 574 | } |
| 575 | } |
| 576 | |
| 577 | r.Close() |
| 578 | b.SetBytes(int64(len(benchmarkReaderPayload))) |
| 579 | } |
| 580 | |
| 581 | func TestCloseLeavesGroup(t *testing.T) { |
| 582 | if os.Getenv("KAFKA_VERSION") == "2.3.1" { |
nothing calls this directly
no test coverage detected