MCPcopy
hub / github.com/grafana/tempo / readKafka

Method readKafka

modules/generator/generator_kafka.go:98–131  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

96}
97
98func (g *Generator) readKafka(ctx context.Context) error {
99 fetches := g.kafkaClient.PollFetches(ctx)
100 fetches.EachError(func(_ string, _ int32, err error) {
101 if !errors.Is(err, context.Canceled) {
102 level.Error(g.logger).Log("msg", "failed to fetch records", "err", err)
103 }
104 })
105 if err := fetches.Err(); err != nil && !errors.Is(err, context.Canceled) {
106 return err
107 }
108
109 // Metric lag based on first message in each partition.
110 // This balances overhead with granularity.
111 fetches.EachPartition(func(p kgo.FetchTopicPartition) {
112 if len(p.Records) > 0 {
113 lag := time.Since(p.Records[0].Timestamp)
114 ingest.SetPartitionLagSeconds(g.cfg.Ingest.Kafka.ConsumerGroup, p.Partition, lag)
115 }
116 })
117
118 start := time.Now()
119
120 for iter := fetches.RecordIter(); !iter.Done(); {
121 select {
122 case g.kafkaCh <- iter.Next():
123 case <-ctx.Done():
124 return ctx.Err()
125 }
126 }
127
128 metricEnqueueTime.Add(time.Since(start).Seconds())
129
130 return nil
131}
132
133// readCh reads records from the internal channel.
134// This allows for offloading the expensive proto unmarshal

Callers 1

listenKafkaMethod · 0.95

Calls 7

SetPartitionLagSecondsFunction · 0.92
LogMethod · 0.65
ErrorMethod · 0.65
NowMethod · 0.65
DoneMethod · 0.65
NextMethod · 0.65
AddMethod · 0.65

Tested by

no test coverage detected