It fetches all the offsets for the blockbuilder topic, for each owned partitions it calculates their last committed records and the end record offset. Based on that it sort the partitions by lag
(ctx context.Context, partitions []int32)
| 544 | // It fetches all the offsets for the blockbuilder topic, for each owned partitions it calculates their last committed records and the |
| 545 | // end record offset. Based on that it sort the partitions by lag |
| 546 | func (b *BlockBuilder) fetchPartitions(ctx context.Context, partitions []int32) ([]partitionState, error) { |
| 547 | var ( |
| 548 | ps = make([]partitionState, 0, len(partitions)) |
| 549 | commits kadm.OffsetResponses |
| 550 | endsOffsets kadm.ListedOffsets |
| 551 | err error |
| 552 | ) |
| 553 | |
| 554 | boff := backoff.New(ctx, backoff.Config{ |
| 555 | MinBackoff: 100 * time.Millisecond, |
| 556 | MaxBackoff: time.Minute, |
| 557 | MaxRetries: 5, |
| 558 | }) |
| 559 | for boff.Ongoing() { |
| 560 | commits, endsOffsets, err = b.getPartitionOffsets(ctx, partitions) |
| 561 | if err == nil { |
| 562 | break |
| 563 | } |
| 564 | ingest.HandleKafkaError(err, b.kafkaClient.ForceMetadataRefresh) |
| 565 | boff.Wait() |
| 566 | } |
| 567 | if err != nil { |
| 568 | return nil, fmt.Errorf("failed to fetch partition offsets: %w", err) |
| 569 | } |
| 570 | for _, partition := range partitions { |
| 571 | p := b.getPartitionState(partition, commits, endsOffsets) |
| 572 | ps = append(ps, p) |
| 573 | } |
| 574 | |
| 575 | return ps, nil |
| 576 | } |
| 577 | |
| 578 | // todo: this function fetches the offsets for all the partitions including the ones that are not assigned to this block builder. |
| 579 | // improve it to only fetch the offsets for the assigned partitions |
no test coverage detected