(t *testing.T, conn *Conn)
| 606 | } |
| 607 | |
| 608 | func testConnReadBatchWithMaxWait(t *testing.T, conn *Conn) { |
| 609 | if _, err := conn.WriteMessages(makeTestSequence(10)...); err != nil { |
| 610 | t.Fatal(err) |
| 611 | } |
| 612 | |
| 613 | const maxBytes = 10e6 // 10 MB |
| 614 | |
| 615 | value := make([]byte, 10e3) // 10 KB |
| 616 | |
| 617 | cfg := ReadBatchConfig{ |
| 618 | MinBytes: maxBytes, // use max for both so that we hit max wait time |
| 619 | MaxBytes: maxBytes, |
| 620 | MaxWait: 500 * time.Millisecond, |
| 621 | } |
| 622 | |
| 623 | // set aa read deadline so the batch will succeed. |
| 624 | conn.SetDeadline(time.Now().Add(time.Second)) |
| 625 | batch := conn.ReadBatchWith(cfg) |
| 626 | |
| 627 | for i := 0; i < 10; i++ { |
| 628 | _, err := batch.Read(value) |
| 629 | if err != nil { |
| 630 | if err = batch.Close(); err != nil { |
| 631 | t.Fatalf("error trying to read batch message: %s", err) |
| 632 | } |
| 633 | } |
| 634 | |
| 635 | if batch.HighWaterMark() != 10 { |
| 636 | t.Fatal("expected highest offset (watermark) to be 10") |
| 637 | } |
| 638 | } |
| 639 | |
| 640 | batch.Close() |
| 641 | |
| 642 | // reset the offset and ensure that the conn deadline takes precedence over |
| 643 | // the max wait |
| 644 | conn.Seek(0, SeekAbsolute) |
| 645 | conn.SetDeadline(time.Now().Add(50 * time.Millisecond)) |
| 646 | batch = conn.ReadBatchWith(cfg) |
| 647 | var netErr net.Error |
| 648 | if err := batch.Err(); err == nil { |
| 649 | t.Fatal("should have timed out, but got no error") |
| 650 | } else if errors.As(err, &netErr) { |
| 651 | if !netErr.Timeout() { |
| 652 | t.Fatalf("should have timed out, but got: %v", err) |
| 653 | } |
| 654 | } |
| 655 | } |
| 656 | |
| 657 | func waitForCoordinator(t *testing.T, conn *Conn, groupID string) { |
| 658 | // ensure that kafka has allocated a group coordinator. oddly, issue doesn't |
nothing calls this directly
no test coverage detected