(ctx context.Context, ps partitionState)
| 331 | } |
| 332 | |
| 333 | func (b *BlockBuilder) consumePartition(ctx context.Context, ps partitionState) (lastTs time.Time, commitOffset int64, err error) { |
| 334 | ctx, span := tracer.Start(ctx, "blockbuilder.consumePartition", |
| 335 | trace.WithAttributes(attribute.Int("partition", int(ps.partition)), |
| 336 | attribute.String("last_record_ts", ps.lastRecordTs.String()))) |
| 337 | |
| 338 | defer func(t time.Time) { |
| 339 | metricProcessPartitionSectionDuration.WithLabelValues(strconv.Itoa(int(ps.partition))).Observe(time.Since(t).Seconds()) |
| 340 | span.End() |
| 341 | }(time.Now()) |
| 342 | |
| 343 | var ( |
| 344 | dur = b.cfg.ConsumeCycleDuration |
| 345 | topic = b.cfg.IngestStorageConfig.Kafka.Topic |
| 346 | group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup |
| 347 | maxBytesPerCycle = b.cfg.MaxBytesPerCycle |
| 348 | partLabel = strconv.Itoa(int(ps.partition)) |
| 349 | consumedBytes uint64 |
| 350 | startOffset kgo.Offset |
| 351 | init bool |
| 352 | writer *writer |
| 353 | lastRec *kgo.Record |
| 354 | end time.Time |
| 355 | processedRecords int |
| 356 | ) |
| 357 | |
| 358 | startOffset = ps.getStartOffset() |
| 359 | |
| 360 | level.Info(b.logger).Log( |
| 361 | "msg", "consuming partition", |
| 362 | "partition", ps.partition, |
| 363 | "commit_offset", ps.commitOffset, |
| 364 | "start_offset", startOffset, |
| 365 | ) |
| 366 | // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). |
| 367 | // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. |
| 368 | // In the end, we remove the partition from the client (refer to the defer below) to guarantee the client always consumes |
| 369 | // from one partition at a time. I.e. when this partition is consumed, we start consuming the next one. |
| 370 | b.kafkaClient.AddConsumePartitions(map[string]map[int32]kgo.Offset{ |
| 371 | topic: { |
| 372 | ps.partition: startOffset, |
| 373 | }, |
| 374 | }) |
| 375 | defer b.kafkaClient.RemoveConsumePartitions(map[string][]int32{topic: {ps.partition}}) |
| 376 | |
| 377 | outer: |
| 378 | for { |
| 379 | fetches := func() kgo.Fetches { |
| 380 | defer func(t time.Time) { metricFetchDuration.WithLabelValues(partLabel).Observe(time.Since(t).Seconds()) }(time.Now()) |
| 381 | ctx2, cancel := context.WithTimeout(ctx, pollTimeout) |
| 382 | defer cancel() |
| 383 | return b.kafkaClient.PollFetches(ctx2) |
| 384 | }() |
| 385 | err = fetches.Err() |
| 386 | if err != nil { |
| 387 | if errors.Is(err, context.DeadlineExceeded) { |
| 388 | // No more data |
| 389 | break |
| 390 | } |
no test coverage detected