NewConsumerGroup creates a new ConsumerGroup. It returns an error if the provided configuration is invalid. It does not attempt to connect to the Kafka cluster. That happens asynchronously, and any errors will be reported by Next.
(config ConsumerGroupConfig)
| 646 | // Kafka cluster. That happens asynchronously, and any errors will be reported |
| 647 | // by Next. |
| 648 | func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) { |
| 649 | if err := config.Validate(); err != nil { |
| 650 | return nil, err |
| 651 | } |
| 652 | |
| 653 | cg := &ConsumerGroup{ |
| 654 | config: config, |
| 655 | next: make(chan *Generation), |
| 656 | errs: make(chan error), |
| 657 | done: make(chan struct{}), |
| 658 | } |
| 659 | cg.wg.Add(1) |
| 660 | go func() { |
| 661 | cg.run() |
| 662 | cg.wg.Done() |
| 663 | }() |
| 664 | return cg, nil |
| 665 | } |
| 666 | |
| 667 | // ConsumerGroup models a Kafka consumer group. A caller doesn't interact with |
| 668 | // the group directly. Rather, they interact with a Generation. Every time a |