todo: this function fetches the offsets for all the partitions including the ones that are not assigned to this block builder. improve it to only fetch the offsets for the assigned partitions
(ctx context.Context, partitionIDs []int32)
| 578 | // todo: this function fetches the offsets for all the partitions including the ones that are not assigned to this block builder. |
| 579 | // improve it to only fetch the offsets for the assigned partitions |
| 580 | func (b *BlockBuilder) getPartitionOffsets(ctx context.Context, partitionIDs []int32) (kadm.OffsetResponses, kadm.ListedOffsets, error) { |
| 581 | var ( |
| 582 | topic = b.cfg.IngestStorageConfig.Kafka.Topic |
| 583 | group = b.cfg.IngestStorageConfig.Kafka.ConsumerGroup |
| 584 | ) |
| 585 | commits, err := b.kadm.FetchOffsetsForTopics(ctx, group, topic) |
| 586 | if err != nil { |
| 587 | return nil, nil, err |
| 588 | } |
| 589 | if err := commits.Error(); err != nil { |
| 590 | return nil, nil, err |
| 591 | } |
| 592 | |
| 593 | endsOffsets, err := b.partitionOffsetClient.FetchPartitionsLastProducedOffsets(ctx, partitionIDs) |
| 594 | if err != nil { |
| 595 | return nil, nil, err |
| 596 | } |
| 597 | if err := endsOffsets.Error(); err != nil { |
| 598 | return nil, nil, err |
| 599 | } |
| 600 | |
| 601 | return commits, endsOffsets, nil |
| 602 | } |
| 603 | |
| 604 | // Returns the existing state of a partition. Including the last committed record and the last one |
| 605 | func (b *BlockBuilder) getPartitionState(partition int32, commits kadm.OffsetResponses, endsOffsets kadm.ListedOffsets) partitionState { |
no test coverage detected