(kafkaCfg KafkaConfig, partitionRing ring.PartitionRingReader, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt)
| 51 | } |
| 52 | |
| 53 | func NewGroupReaderClient(kafkaCfg KafkaConfig, partitionRing ring.PartitionRingReader, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*Client, error) { |
| 54 | opts = append(opts, |
| 55 | kgo.ConsumerGroup(kafkaCfg.ConsumerGroup), |
| 56 | kgo.ConsumeTopics(kafkaCfg.Topic), |
| 57 | kgo.SessionTimeout(3*time.Minute), |
| 58 | kgo.RebalanceTimeout(5*time.Minute), |
| 59 | kgo.Balancers(NewCooperativeActiveStickyBalancer(partitionRing)), |
| 60 | kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()), |
| 61 | ) |
| 62 | |
| 63 | client, err := NewReaderClient(kafkaCfg, metrics, logger, opts...) |
| 64 | if err != nil { |
| 65 | return nil, err |
| 66 | } |
| 67 | |
| 68 | c := &Client{ |
| 69 | Client: client, |
| 70 | logger: logger, |
| 71 | stopCh: make(chan struct{}), |
| 72 | partitionRing: partitionRing, |
| 73 | } |
| 74 | // Start the partition monitor goroutine |
| 75 | c.wg.Add(1) |
| 76 | go c.monitorPartitions() |
| 77 | |
| 78 | return c, nil |
| 79 | } |
| 80 | |
| 81 | func (c *Client) monitorPartitions() { |
| 82 | defer c.wg.Done() |
no test coverage detected