Returns the existing state of a partition. Including the last committed record and the last one
(partition int32, commits kadm.OffsetResponses, endsOffsets kadm.ListedOffsets)
| 603 | |
| 604 | // Returns the existing state of a partition. Including the last committed record and the last one |
| 605 | func (b *BlockBuilder) getPartitionState(partition int32, commits kadm.OffsetResponses, endsOffsets kadm.ListedOffsets) partitionState { |
| 606 | var ( |
| 607 | topic = b.cfg.IngestStorageConfig.Kafka.Topic |
| 608 | ps = partitionState{partition: partition, commitOffset: commitOffsetAtEnd, endOffset: emptyPartitionEndOffset} |
| 609 | ) |
| 610 | |
| 611 | lastCommit, found := commits.Lookup(topic, partition) |
| 612 | if found { |
| 613 | ps.commitOffset = lastCommit.At |
| 614 | } |
| 615 | |
| 616 | lastRecord, found := endsOffsets.Lookup(topic, partition) |
| 617 | if found { |
| 618 | ps.endOffset = lastRecord.Offset |
| 619 | } |
| 620 | |
| 621 | return ps |
| 622 | } |
| 623 | |
| 624 | func (b *BlockBuilder) stopping(err error) error { |
| 625 | select { |