(ctx context.Context, offset kadm.Offset, group string, partition int32)
| 503 | } |
| 504 | |
| 505 | func (b *BlockBuilder) commitOffset(ctx context.Context, offset kadm.Offset, group string, partition int32) error { |
| 506 | offsets := make(kadm.Offsets) |
| 507 | offsets.Add(offset) |
| 508 | |
| 509 | trace.SpanFromContext(ctx).AddEvent("committing offset", trace.WithAttributes(attribute.Int64("at", offset.At))) |
| 510 | |
| 511 | boff := backoff.New(ctx, backoff.Config{ |
| 512 | MinBackoff: 100 * time.Millisecond, |
| 513 | MaxBackoff: time.Minute, |
| 514 | MaxRetries: 10, |
| 515 | }) |
| 516 | for boff.Ongoing() { |
| 517 | err := b.kadm.CommitAllOffsets(ctx, group, offsets) |
| 518 | if err == nil { |
| 519 | break |
| 520 | } |
| 521 | ingest.HandleKafkaError(err, b.kafkaClient.ForceMetadataRefresh) |
| 522 | level.Warn(b.logger).Log( |
| 523 | "msg", "failed to commit offset, retrying", |
| 524 | "err", err, |
| 525 | "partition", partition, |
| 526 | "commit_offset", offset.At, |
| 527 | ) |
| 528 | boff.Wait() |
| 529 | } |
| 530 | if err := boff.ErrCause(); err != nil { |
| 531 | return fmt.Errorf("error committing offset %d for partition %d, it won't be retried: %w", offset.At, partition, err) |
| 532 | } |
| 533 | return nil |
| 534 | } |
| 535 | |
| 536 | func formatActivePartitions(partitions []int32) string { |
| 537 | var strArr []string |
no test coverage detected