MCPcopy
hub / github.com/grafana/tempo / starting

Method starting

modules/blockbuilder/blockbuilder.go:183–223  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

181}
182
183func (b *BlockBuilder) starting(ctx context.Context) (err error) {
184 level.Info(b.logger).Log("msg", "block builder starting")
185 topic := b.cfg.IngestStorageConfig.Kafka.Topic
186
187 b.enc, err = encoding.FromVersionForWrites(b.cfg.BlockConfig.Version)
188 if err != nil {
189 return fmt.Errorf("failed to create encoding: %w", err)
190 }
191
192 b.wal, err = wal.New(&b.cfg.WAL)
193 if err != nil {
194 return fmt.Errorf("failed to create WAL: %w", err)
195 }
196
197 b.kafkaClient, err = ingest.NewReaderClient(
198 b.cfg.IngestStorageConfig.Kafka,
199 ingest.NewReaderClientMetrics(blockBuilderServiceName, prometheus.DefaultRegisterer),
200 b.logger,
201 )
202 if err != nil {
203 return fmt.Errorf("failed to create kafka reader client: %w", err)
204 }
205
206 err = ingest.WaitForKafkaBroker(ctx, b.kafkaClient, b.logger)
207 if err != nil {
208 return fmt.Errorf("failed to start blockbuilder: %w", err)
209 }
210
211 b.partitionOffsetClient = ingest.NewPartitionOffsetClient(b.kafkaClient, topic)
212 b.kadm = kadm.NewClient(b.kafkaClient)
213
214 ingest.ExportPartitionLagMetrics(
215 ctx,
216 b.kafkaClient,
217 b.logger,
218 b.cfg.IngestStorageConfig,
219 b.getAssignedPartitions,
220 b.kafkaClient.ForceMetadataRefresh)
221
222 return nil
223}
224
225func (b *BlockBuilder) running(ctx context.Context) error {
226 defer close(b.consumeStopped)

Callers 2

BenchmarkBlockBuilderFunction · 0.45

Calls 8

FromVersionForWritesFunction · 0.92
NewFunction · 0.92
NewReaderClientFunction · 0.92
NewReaderClientMetricsFunction · 0.92
WaitForKafkaBrokerFunction · 0.92
NewPartitionOffsetClientFunction · 0.92
LogMethod · 0.65

Tested by 2

BenchmarkBlockBuilderFunction · 0.36