MCPcopy
hub / github.com/segmentio/kafka-go / testConnReadBatchWithMaxWait

Function testConnReadBatchWithMaxWait

conn_test.go:608–655  ·  view source on GitHub ↗
(t *testing.T, conn *Conn)

Source from the content-addressed store, hash-verified

606}
607
608func testConnReadBatchWithMaxWait(t *testing.T, conn *Conn) {
609 if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil {
610 t.Fatal(err)
611 }
612
613 const maxBytes = 10e6 // 10 MB
614
615 value := make([]byte, 10e3) // 10 KB
616
617 cfg := ReadBatchConfig{
618 MinBytes: maxBytes, // use max for both so that we hit max wait time
619 MaxBytes: maxBytes,
620 MaxWait: 500 * time.Millisecond,
621 }
622
623 // set aa read deadline so the batch will succeed.
624 conn.SetDeadline(time.Now().Add(time.Second))
625 batch := conn.ReadBatchWith(cfg)
626
627 for i := 0; i < 10; i++ {
628 _, err := batch.Read(value)
629 if err != nil {
630 if err = batch.Close(); err != nil {
631 t.Fatalf("error trying to read batch message: %s", err)
632 }
633 }
634
635 if batch.HighWaterMark() != 10 {
636 t.Fatal("expected highest offset (watermark) to be 10")
637 }
638 }
639
640 batch.Close()
641
642 // reset the offset and ensure that the conn deadline takes precedence over
643 // the max wait
644 conn.Seek(0, SeekAbsolute)
645 conn.SetDeadline(time.Now().Add(50 * time.Millisecond))
646 batch = conn.ReadBatchWith(cfg)
647 var netErr net.Error
648 if err := batch.Err(); err == nil {
649 t.Fatal("should have timed out, but got no error")
650 } else if errors.As(err, &netErr) {
651 if !netErr.Timeout() {
652 t.Fatalf("should have timed out, but got: %v", err)
653 }
654 }
655}
656
657func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
658 // ensure that kafka has allocated a group coordinator. oddly, issue doesn't

Callers

nothing calls this directly

Calls 10

TimeoutMethod · 0.95
makeTestSequenceFunction · 0.85
ReadBatchWithMethod · 0.80
HighWaterMarkMethod · 0.80
WriteMessagesMethod · 0.45
SetDeadlineMethod · 0.45
ReadMethod · 0.45
CloseMethod · 0.45
SeekMethod · 0.45
ErrMethod · 0.45

Tested by

no test coverage detected