MCPcopy
hub / github.com/grafana/tempo / commitOffset

Method commitOffset

modules/blockbuilder/blockbuilder.go:505–534  ·  view source on GitHub ↗
(ctx context.Context, offset kadm.Offset, group string, partition int32)

Source from the content-addressed store, hash-verified

503}
504
505func (b *BlockBuilder) commitOffset(ctx context.Context, offset kadm.Offset, group string, partition int32) error {
506 offsets := make(kadm.Offsets)
507 offsets.Add(offset)
508
509 trace.SpanFromContext(ctx).AddEvent("committing offset", trace.WithAttributes(attribute.Int64("at", offset.At)))
510
511 boff := backoff.New(ctx, backoff.Config{
512 MinBackoff: 100 * time.Millisecond,
513 MaxBackoff: time.Minute,
514 MaxRetries: 10,
515 })
516 for boff.Ongoing() {
517 err := b.kadm.CommitAllOffsets(ctx, group, offsets)
518 if err == nil {
519 break
520 }
521 ingest.HandleKafkaError(err, b.kafkaClient.ForceMetadataRefresh)
522 level.Warn(b.logger).Log(
523 "msg", "failed to commit offset, retrying",
524 "err", err,
525 "partition", partition,
526 "commit_offset", offset.At,
527 )
528 boff.Wait()
529 }
530 if err := boff.ErrCause(); err != nil {
531 return fmt.Errorf("error committing offset %d for partition %d, it won't be retried: %w", offset.At, partition, err)
532 }
533 return nil
534}
535
536func formatActivePartitions(partitions []int32) string {
537 var strArr []string

Callers 1

consumePartitionMethod · 0.95

Calls 4

HandleKafkaErrorFunction · 0.92
AddMethod · 0.65
LogMethod · 0.65
WaitMethod · 0.65

Tested by

no test coverage detected