NewReader creates and returns a new Reader configured with config. The offset is initialized to FirstOffset.
(config ReaderConfig)
| 627 | // NewReader creates and returns a new Reader configured with config. |
| 628 | // The offset is initialized to FirstOffset. |
| 629 | func NewReader(config ReaderConfig) *Reader { |
| 630 | if err := config.Validate(); err != nil { |
| 631 | panic(err) |
| 632 | } |
| 633 | |
| 634 | if config.GroupID != "" { |
| 635 | if len(config.GroupBalancers) == 0 { |
| 636 | config.GroupBalancers = []GroupBalancer{ |
| 637 | RangeGroupBalancer{}, |
| 638 | RoundRobinGroupBalancer{}, |
| 639 | } |
| 640 | } |
| 641 | } |
| 642 | |
| 643 | if config.Dialer == nil { |
| 644 | config.Dialer = DefaultDialer |
| 645 | } |
| 646 | |
| 647 | if config.MaxBytes == 0 { |
| 648 | config.MaxBytes = 1e6 // 1 MB |
| 649 | } |
| 650 | |
| 651 | if config.MinBytes == 0 { |
| 652 | config.MinBytes = defaultFetchMinBytes |
| 653 | } |
| 654 | |
| 655 | if config.MaxWait == 0 { |
| 656 | config.MaxWait = 10 * time.Second |
| 657 | } |
| 658 | |
| 659 | if config.ReadBatchTimeout == 0 { |
| 660 | config.ReadBatchTimeout = 10 * time.Second |
| 661 | } |
| 662 | |
| 663 | if config.ReadLagInterval == 0 { |
| 664 | config.ReadLagInterval = 1 * time.Minute |
| 665 | } |
| 666 | |
| 667 | if config.ReadBackoffMin == 0 { |
| 668 | config.ReadBackoffMin = defaultReadBackoffMin |
| 669 | } |
| 670 | |
| 671 | if config.ReadBackoffMax == 0 { |
| 672 | config.ReadBackoffMax = defaultReadBackoffMax |
| 673 | } |
| 674 | |
| 675 | if config.ReadBackoffMax < config.ReadBackoffMin { |
| 676 | panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin)) |
| 677 | } |
| 678 | |
| 679 | if config.QueueCapacity == 0 { |
| 680 | config.QueueCapacity = 100 |
| 681 | } |
| 682 | |
| 683 | if config.MaxAttempts == 0 { |
| 684 | config.MaxAttempts = 3 |
| 685 | } |
| 686 |