MCPcopy
hub / github.com/grafana/tempo / readCh

Method readCh

modules/generator/generator_kafka.go:136–178  ·  view source on GitHub ↗

readCh reads records from the internal channel. This allows for offloading the expensive proto unmarshal to multiple goroutines.

(ctx context.Context)

Source from the content-addressed store, hash-verified

134// This allows for offloading the expensive proto unmarshal
135// to multiple goroutines.
136func (g *Generator) readCh(ctx context.Context) {
137 defer g.kafkaWG.Done()
138
139 var c ingest.GeneratorCodec
140 switch g.cfg.Codec {
141 case codecPushBytes:
142 c = ingest.NewPushBytesDecoder()
143 case codecOTLP:
144 c = ingest.NewOTLPDecoder()
145 }
146
147 for {
148 var r *kgo.Record
149 select {
150 case r = <-g.kafkaCh:
151 case <-ctx.Done():
152 return
153 }
154
155 tenant := string(r.Key)
156
157 i, err := g.getOrCreateInstance(tenant)
158 if err != nil {
159 level.Error(g.logger).Log("tenant", tenant, "msg", "consumeKafkaChannel getOrCreateInstance", "err", err)
160 continue
161 }
162
163 iterator, err := c.Decode(r.Value)
164 if err != nil {
165 level.Error(g.logger).Log("tenant", tenant, "msg", "consumeKafkaChannel decode", "err", err)
166 continue
167 }
168
169 for resourceSpans, err := range iterator {
170 if err != nil {
171 level.Error(g.logger).Log("tenant", tenant, "msg", "consumeKafkaChannel unmarshal", "err", err)
172 continue
173 }
174
175 i.pushSpansFromQueue(ctx, r.Timestamp, resourceSpans)
176 }
177 }
178}
179
180func (g *Generator) getAssignedActivePartitions() []int32 {
181 g.partitionMtx.Lock()

Callers 1

startKafkaMethod · 0.95

Calls 8

getOrCreateInstanceMethod · 0.95
DecodeMethod · 0.95
NewPushBytesDecoderFunction · 0.92
NewOTLPDecoderFunction · 0.92
pushSpansFromQueueMethod · 0.80
DoneMethod · 0.65
LogMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected