MCPcopy
hub / github.com/segmentio/kafka-go / NewConsumerGroup

Function NewConsumerGroup

consumergroup.go:648–665  ·  view source on GitHub ↗

NewConsumerGroup creates a new ConsumerGroup. It returns an error if the provided configuration is invalid. It does not attempt to connect to the Kafka cluster. That happens asynchronously, and any errors will be reported by Next.

(config ConsumerGroupConfig)

Source from the content-addressed store, hash-verified

646// Kafka cluster. That happens asynchronously, and any errors will be reported
647// by Next.
648func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) {
649 if err := config.Validate(); err != nil {
650 return nil, err
651 }
652
653 cg := &ConsumerGroup{
654 config: config,
655 next: make(chan *Generation),
656 errs: make(chan error),
657 done: make(chan struct{}),
658 }
659 cg.wg.Add(1)
660 go func() {
661 cg.run()
662 cg.wg.Done()
663 }()
664 return cg, nil
665}
666
667// ConsumerGroup models a Kafka consumer group. A caller doesn't interact with
668// the group directly. Rather, they interact with a Generation. Every time a

Callers 9

TestClientDeleteOffsetFunction · 0.85
TestClientOffsetCommitFunction · 0.85
TestClientDeleteGroupsFunction · 0.85
NewReaderFunction · 0.85
TestConsumerGroupFunction · 0.85
TestConsumerGroupErrorsFunction · 0.85
TestClientHeartbeatFunction · 0.85

Calls 3

runMethod · 0.95
DoneMethod · 0.80
ValidateMethod · 0.45

Tested by 8

TestClientDeleteOffsetFunction · 0.68
TestClientOffsetCommitFunction · 0.68
TestClientDeleteGroupsFunction · 0.68
TestConsumerGroupFunction · 0.68
TestConsumerGroupErrorsFunction · 0.68
TestClientHeartbeatFunction · 0.68