MCPcopy
hub / github.com/grafana/tempo / startKafka

Method startKafka

modules/generator/generator_kafka.go:36–51  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

34})
35
36func (g *Generator) startKafka() {
37 g.kafkaCh = make(chan *kgo.Record, g.cfg.IngestConcurrency)
38 // Create context that will be used to stop the goroutines.
39 var ctx context.Context
40 ctx, g.kafkaStop = context.WithCancel(context.Background())
41
42 for i := uint(0); i < g.cfg.IngestConcurrency; i++ {
43 g.kafkaWG.Add(1)
44 go g.readCh(ctx)
45 }
46
47 g.kafkaWG.Add(1)
48 go g.listenKafka(ctx)
49
50 ingest.ExportPartitionLagMetrics(ctx, g.kafkaClient.Client, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions, g.kafkaClient.ForceMetadataRefresh)
51}
52
53func (g *Generator) stopKafka() {
54 g.kafkaStop()

Callers 1

runningMethod · 0.95

Calls 4

readChMethod · 0.95
listenKafkaMethod · 0.95
AddMethod · 0.65

Tested by

no test coverage detected