MCPcopy
hub / github.com/segmentio/kafka-go / NewReader

Function NewReader

reader.go:629–748  ·  reader.go::NewReader

NewReader creates and returns a new Reader configured with config. The offset is initialized to FirstOffset.

(config ReaderConfig)

Source from the content-addressed store, hash-verified

627// NewReader creates and returns a new Reader configured with config.
628// The offset is initialized to FirstOffset.
629func NewReader(config ReaderConfig) *Reader {
630 if err := config.Validate(); err != nil {
631 panic(err)
632 }
633
634 if config.GroupID != "" {
635 if len(config.GroupBalancers) == 0 {
636 config.GroupBalancers = []GroupBalancer{
637 RangeGroupBalancer{},
638 RoundRobinGroupBalancer{},
639 }
640 }
641 }
642
643 if config.Dialer == nil {
644 config.Dialer = DefaultDialer
645 }
646
647 if config.MaxBytes == 0 {
648 config.MaxBytes = 1e6 // 1 MB
649 }
650
651 if config.MinBytes == 0 {
652 config.MinBytes = defaultFetchMinBytes
653 }
654
655 if config.MaxWait == 0 {
656 config.MaxWait = 10 * time.Second
657 }
658
659 if config.ReadBatchTimeout == 0 {
660 config.ReadBatchTimeout = 10 * time.Second
661 }
662
663 if config.ReadLagInterval == 0 {
664 config.ReadLagInterval = 1 * time.Minute
665 }
666
667 if config.ReadBackoffMin == 0 {
668 config.ReadBackoffMin = defaultReadBackoffMin
669 }
670
671 if config.ReadBackoffMax == 0 {
672 config.ReadBackoffMax = defaultReadBackoffMax
673 }
674
675 if config.ReadBackoffMax < config.ReadBackoffMin {
676 panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
677 }
678
679 if config.QueueCapacity == 0 {
680 config.QueueCapacity = 100
681 }
682
683 if config.MaxAttempts == 0 {
684 config.MaxAttempts = 3
685 }
686

Calls 6

useConsumerGroupMethod · 0.95
getTopicsMethod · 0.95
runMethod · 0.95
makeSummaryFunction · 0.85
NewConsumerGroupFunction · 0.85
ValidateMethod · 0.45