(ctx context.Context)
| 115 | } |
| 116 | |
| 117 | func (g *Generator) starting(ctx context.Context) error { |
| 118 | if g.cfg.ConsumeFromKafka { |
| 119 | kafkaClient, err := ingest.NewGroupReaderClient( |
| 120 | g.cfg.Ingest.Kafka, |
| 121 | g.partitionRing, |
| 122 | ingest.NewReaderClientMetrics("generator", prometheus.DefaultRegisterer), |
| 123 | g.logger, |
| 124 | kgo.InstanceID(g.cfg.InstanceID), |
| 125 | kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) { |
| 126 | g.handlePartitionsAssigned(m) |
| 127 | }), |
| 128 | kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, m map[string][]int32) { |
| 129 | g.handlePartitionsRevoked(m) |
| 130 | }), |
| 131 | ) |
| 132 | if err != nil { |
| 133 | return fmt.Errorf("failed to create kafka reader client: %w", err) |
| 134 | } |
| 135 | |
| 136 | g.kafkaClient = kafkaClient |
| 137 | if err := ingest.WaitForKafkaBroker(ctx, g.kafkaClient.Client, g.logger); err != nil { |
| 138 | return fmt.Errorf("failed to start metrics generator: %w", err) |
| 139 | } |
| 140 | |
| 141 | g.kafkaAdm = kadm.NewClient(g.kafkaClient.Client) |
| 142 | g.partitionClient = ingest.NewPartitionOffsetClient(g.kafkaClient.Client, g.cfg.Ingest.Kafka.Topic) |
| 143 | } |
| 144 | |
| 145 | return nil |
| 146 | } |
| 147 | |
| 148 | func (g *Generator) running(ctx context.Context) error { |
| 149 | if g.cfg.ConsumeFromKafka { |
nothing calls this directly
no test coverage detected