MCPcopy
hub / github.com/grafana/tempo / fetchPartitions

Method fetchPartitions

modules/blockbuilder/blockbuilder.go:546–576  ·  view source on GitHub ↗

It fetches all the offsets for the blockbuilder topic, for each owned partitions it calculates their last committed records and the end record offset. Based on that it sort the partitions by lag

(ctx context.Context, partitions []int32)

Source from the content-addressed store, hash-verified

544// It fetches all the offsets for the blockbuilder topic, for each owned partitions it calculates their last committed records and the
545// end record offset. Based on that it sort the partitions by lag
546func (b *BlockBuilder) fetchPartitions(ctx context.Context, partitions []int32) ([]partitionState, error) {
547 var (
548 ps = make([]partitionState, 0, len(partitions))
549 commits kadm.OffsetResponses
550 endsOffsets kadm.ListedOffsets
551 err error
552 )
553
554 boff := backoff.New(ctx, backoff.Config{
555 MinBackoff: 100 * time.Millisecond,
556 MaxBackoff: time.Minute,
557 MaxRetries: 5,
558 })
559 for boff.Ongoing() {
560 commits, endsOffsets, err = b.getPartitionOffsets(ctx, partitions)
561 if err == nil {
562 break
563 }
564 ingest.HandleKafkaError(err, b.kafkaClient.ForceMetadataRefresh)
565 boff.Wait()
566 }
567 if err != nil {
568 return nil, fmt.Errorf("failed to fetch partition offsets: %w", err)
569 }
570 for _, partition := range partitions {
571 p := b.getPartitionState(partition, commits, endsOffsets)
572 ps = append(ps, p)
573 }
574
575 return ps, nil
576}
577
578// todo: this function fetches the offsets for all the partitions including the ones that are not assigned to this block builder.
579// improve it to only fetch the offsets for the assigned partitions

Callers 1

consumeMethod · 0.95

Calls 4

getPartitionOffsetsMethod · 0.95
getPartitionStateMethod · 0.95
HandleKafkaErrorFunction · 0.92
WaitMethod · 0.65

Tested by

no test coverage detected