()
| 34 | }) |
| 35 | |
| 36 | func (g *Generator) startKafka() { |
| 37 | g.kafkaCh = make(chan *kgo.Record, g.cfg.IngestConcurrency) |
| 38 | // Create context that will be used to stop the goroutines. |
| 39 | var ctx context.Context |
| 40 | ctx, g.kafkaStop = context.WithCancel(context.Background()) |
| 41 | |
| 42 | for i := uint(0); i < g.cfg.IngestConcurrency; i++ { |
| 43 | g.kafkaWG.Add(1) |
| 44 | go g.readCh(ctx) |
| 45 | } |
| 46 | |
| 47 | g.kafkaWG.Add(1) |
| 48 | go g.listenKafka(ctx) |
| 49 | |
| 50 | ingest.ExportPartitionLagMetrics(ctx, g.kafkaClient.Client, g.logger, g.cfg.Ingest, g.getAssignedActivePartitions, g.kafkaClient.ForceMetadataRefresh) |
| 51 | } |
| 52 | |
| 53 | func (g *Generator) stopKafka() { |
| 54 | g.kafkaStop() |
no test coverage detected