ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured with the default values in ReadBatchConfig except for minBytes and maxBytes.
(cfg ReadBatchConfig)
| 756 | // ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured |
| 757 | // with the default values in ReadBatchConfig except for minBytes and maxBytes. |
| 758 | func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { |
| 759 | |
| 760 | var adjustedDeadline time.Time |
| 761 | var maxFetch = int(c.fetchMaxBytes) |
| 762 | |
| 763 | if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch { |
| 764 | return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)} |
| 765 | } |
| 766 | if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch { |
| 767 | return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)} |
| 768 | } |
| 769 | if cfg.MinBytes > cfg.MaxBytes { |
| 770 | return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)} |
| 771 | } |
| 772 | |
| 773 | offset, whence := c.Offset() |
| 774 | |
| 775 | offset, err := c.Seek(offset, whence|SeekDontCheck) |
| 776 | if err != nil { |
| 777 | return &Batch{err: dontExpectEOF(err)} |
| 778 | } |
| 779 | |
| 780 | fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10) |
| 781 | if err != nil { |
| 782 | return &Batch{err: dontExpectEOF(err)} |
| 783 | } |
| 784 | |
| 785 | id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error { |
| 786 | now := time.Now() |
| 787 | var timeout time.Duration |
| 788 | if cfg.MaxWait > 0 { |
| 789 | // explicitly-configured case: no changes are made to the deadline, |
| 790 | // and the timeout is sent exactly as specified. |
| 791 | timeout = cfg.MaxWait |
| 792 | } else { |
| 793 | // default case: use the original logic to adjust the conn's |
| 794 | // deadline.T |
| 795 | deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) |
| 796 | timeout = deadlineToTimeout(deadline, now) |
| 797 | } |
| 798 | // save this variable outside of the closure for later use in detecting |
| 799 | // truncated messages. |
| 800 | adjustedDeadline = deadline |
| 801 | switch fetchVersion { |
| 802 | case v10: |
| 803 | return c.wb.writeFetchRequestV10( |
| 804 | id, |
| 805 | c.clientID, |
| 806 | c.topic, |
| 807 | c.partition, |
| 808 | offset, |
| 809 | cfg.MinBytes, |
| 810 | cfg.MaxBytes+int(c.fetchMinSize), |
| 811 | timeout, |
| 812 | int8(cfg.IsolationLevel), |
| 813 | ) |
| 814 | case v5: |
| 815 | return c.wb.writeFetchRequestV5( |