MCPcopy
hub / github.com/grafana/tempo / starting

Method starting

modules/generator/generator.go:117–146  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

115}
116
117func (g *Generator) starting(ctx context.Context) error {
118 if g.cfg.ConsumeFromKafka {
119 kafkaClient, err := ingest.NewGroupReaderClient(
120 g.cfg.Ingest.Kafka,
121 g.partitionRing,
122 ingest.NewReaderClientMetrics("generator", prometheus.DefaultRegisterer),
123 g.logger,
124 kgo.InstanceID(g.cfg.InstanceID),
125 kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
126 g.handlePartitionsAssigned(m)
127 }),
128 kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, m map[string][]int32) {
129 g.handlePartitionsRevoked(m)
130 }),
131 )
132 if err != nil {
133 return fmt.Errorf("failed to create kafka reader client: %w", err)
134 }
135
136 g.kafkaClient = kafkaClient
137 if err := ingest.WaitForKafkaBroker(ctx, g.kafkaClient.Client, g.logger); err != nil {
138 return fmt.Errorf("failed to start metrics generator: %w", err)
139 }
140
141 g.kafkaAdm = kadm.NewClient(g.kafkaClient.Client)
142 g.partitionClient = ingest.NewPartitionOffsetClient(g.kafkaClient.Client, g.cfg.Ingest.Kafka.Topic)
143 }
144
145 return nil
146}
147
148func (g *Generator) running(ctx context.Context) error {
149 if g.cfg.ConsumeFromKafka {

Callers

nothing calls this directly

Calls 6

NewGroupReaderClientFunction · 0.92
NewReaderClientMetricsFunction · 0.92
WaitForKafkaBrokerFunction · 0.92
NewPartitionOffsetClientFunction · 0.92

Tested by

no test coverage detected