MCPcopy
hub / github.com/segmentio/kafka-go / ReadBatchWith

Method ReadBatchWith

conn.go:758–892  ·  view source on GitHub ↗

ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured with the default values in ReadBatchConfig except for minBytes and maxBytes.

(cfg ReadBatchConfig)

Source from the content-addressed store, hash-verified

756// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
757// with the default values in ReadBatchConfig except for minBytes and maxBytes.
758func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
759
760 var adjustedDeadline time.Time
761 var maxFetch = int(c.fetchMaxBytes)
762
763 if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
764 return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
765 }
766 if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch {
767 return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
768 }
769 if cfg.MinBytes > cfg.MaxBytes {
770 return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
771 }
772
773 offset, whence := c.Offset()
774
775 offset, err := c.Seek(offset, whence|SeekDontCheck)
776 if err != nil {
777 return &Batch{err: dontExpectEOF(err)}
778 }
779
780 fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10)
781 if err != nil {
782 return &Batch{err: dontExpectEOF(err)}
783 }
784
785 id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
786 now := time.Now()
787 var timeout time.Duration
788 if cfg.MaxWait > 0 {
789 // explicitly-configured case: no changes are made to the deadline,
790 // and the timeout is sent exactly as specified.
791 timeout = cfg.MaxWait
792 } else {
793 // default case: use the original logic to adjust the conn's
794 // deadline.T
795 deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
796 timeout = deadlineToTimeout(deadline, now)
797 }
798 // save this variable outside of the closure for later use in detecting
799 // truncated messages.
800 adjustedDeadline = deadline
801 switch fetchVersion {
802 case v10:
803 return c.wb.writeFetchRequestV10(
804 id,
805 c.clientID,
806 c.topic,
807 c.partition,
808 offset,
809 cfg.MinBytes,
810 cfg.MaxBytes+int(c.fetchMinSize),
811 timeout,
812 int8(cfg.IsolationLevel),
813 )
814 case v5:
815 return c.wb.writeFetchRequestV5(

Callers 4

ReadBatchMethod · 0.95
readMethod · 0.80

Calls 15

OffsetMethod · 0.95
SeekMethod · 0.95
negotiateVersionMethod · 0.95
doRequestMethod · 0.95
waitResponseMethod · 0.95
adjustDeadlineForRTTFunction · 0.85
deadlineToTimeoutFunction · 0.85
checkTimeoutErrFunction · 0.85
newMessageSetReaderFunction · 0.85

Tested by 2