MCPcopy
hub / github.com/grafana/tempo / getPartitionOffsets

Method getPartitionOffsets

modules/blockbuilder/blockbuilder.go:580–602  ·  view source on GitHub ↗

todo: this function fetches the offsets for all the partitions including the ones that are not assigned to this block builder. improve it to only fetch the offsets for the assigned partitions

(ctx context.Context, partitionIDs []int32)

Source from the content-addressed store, hash-verified

578// todo: this function fetches the offsets for all the partitions including the ones that are not assigned to this block builder.
579// improve it to only fetch the offsets for the assigned partitions
580func (b *BlockBuilder) getPartitionOffsets(ctx context.Context, partitionIDs []int32) (kadm.OffsetResponses, kadm.ListedOffsets, error) {
581 var (
582 topic = b.cfg.IngestStorageConfig.Kafka.Topic
583 group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup
584 )
585 commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic)
586 if err != nil {
587 return nil, nil, err
588 }
589 if err := commits.Error(); err != nil {
590 return nil, nil, err
591 }
592
593 endsOffsets, err := b.partitionOffsetClient.FetchPartitionsLastProducedOffsets(ctx, partitionIDs)
594 if err != nil {
595 return nil, nil, err
596 }
597 if err := endsOffsets.Error(); err != nil {
598 return nil, nil, err
599 }
600
601 return commits, endsOffsets, nil
602}
603
604// Returns the existing state of a partition. Including the last committed record and the last one
605func (b *BlockBuilder) getPartitionState(partition int32, commits kadm.OffsetResponses, endsOffsets kadm.ListedOffsets) partitionState {

Callers 1

fetchPartitionsMethod · 0.95

Calls 2

ErrorMethod · 0.65

Tested by

no test coverage detected