| 181 | } |
| 182 | |
| 183 | func (b *BlockBuilder) starting(ctx context.Context) (err error) { |
| 184 | level.Info(b.logger).Log("msg", "block builder starting") |
| 185 | topic := b.cfg.IngestStorageConfig.Kafka.Topic |
| 186 | |
| 187 | b.enc, err = encoding.FromVersionForWrites(b.cfg.BlockConfig.Version) |
| 188 | if err != nil { |
| 189 | return fmt.Errorf("failed to create encoding: %w", err) |
| 190 | } |
| 191 | |
| 192 | b.wal, err = wal.New(&b.cfg.WAL) |
| 193 | if err != nil { |
| 194 | return fmt.Errorf("failed to create WAL: %w", err) |
| 195 | } |
| 196 | |
| 197 | b.kafkaClient, err = ingest.NewReaderClient( |
| 198 | b.cfg.IngestStorageConfig.Kafka, |
| 199 | ingest.NewReaderClientMetrics(blockBuilderServiceName, prometheus.DefaultRegisterer), |
| 200 | b.logger, |
| 201 | ) |
| 202 | if err != nil { |
| 203 | return fmt.Errorf("failed to create kafka reader client: %w", err) |
| 204 | } |
| 205 | |
| 206 | err = ingest.WaitForKafkaBroker(ctx, b.kafkaClient, b.logger) |
| 207 | if err != nil { |
| 208 | return fmt.Errorf("failed to start blockbuilder: %w", err) |
| 209 | } |
| 210 | |
| 211 | b.partitionOffsetClient = ingest.NewPartitionOffsetClient(b.kafkaClient, topic) |
| 212 | b.kadm = kadm.NewClient(b.kafkaClient) |
| 213 | |
| 214 | ingest.ExportPartitionLagMetrics( |
| 215 | ctx, |
| 216 | b.kafkaClient, |
| 217 | b.logger, |
| 218 | b.cfg.IngestStorageConfig, |
| 219 | b.getAssignedPartitions, |
| 220 | b.kafkaClient.ForceMetadataRefresh) |
| 221 | |
| 222 | return nil |
| 223 | } |
| 224 | |
| 225 | func (b *BlockBuilder) running(ctx context.Context) error { |
| 226 | defer close(b.consumeStopped) |