MCPcopy
hub / github.com/grafana/tempo / NewGroupReaderClient

Function NewGroupReaderClient

pkg/ingest/reader_client.go:53–79  ·  view source on GitHub ↗
(kafkaCfg KafkaConfig, partitionRing ring.PartitionRingReader, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt)

Source from the content-addressed store, hash-verified

51}
52
53func NewGroupReaderClient(kafkaCfg KafkaConfig, partitionRing ring.PartitionRingReader, metrics *kprom.Metrics, logger log.Logger, opts ...kgo.Opt) (*Client, error) {
54 opts = append(opts,
55 kgo.ConsumerGroup(kafkaCfg.ConsumerGroup),
56 kgo.ConsumeTopics(kafkaCfg.Topic),
57 kgo.SessionTimeout(3*time.Minute),
58 kgo.RebalanceTimeout(5*time.Minute),
59 kgo.Balancers(NewCooperativeActiveStickyBalancer(partitionRing)),
60 kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
61 )
62
63 client, err := NewReaderClient(kafkaCfg, metrics, logger, opts...)
64 if err != nil {
65 return nil, err
66 }
67
68 c := &Client{
69 Client: client,
70 logger: logger,
71 stopCh: make(chan struct{}),
72 partitionRing: partitionRing,
73 }
74 // Start the partition monitor goroutine
75 c.wg.Add(1)
76 go c.monitorPartitions()
77
78 return c, nil
79}
80
81func (c *Client) monitorPartitions() {
82 defer c.wg.Done()

Callers 1

startingMethod · 0.92

Calls 4

monitorPartitionsMethod · 0.95
NewReaderClientFunction · 0.85
AddMethod · 0.65

Tested by

no test coverage detected