MCPcopy
hub / github.com/IBM/sarama / chooseStartingOffset

Method chooseStartingOffset

consumer.go:662–687  ·  view source on GitHub ↗
(offset int64)

Source from the content-addressed store, hash-verified

660}
661
662func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
663 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
664 if err != nil {
665 return err
666 }
667
668 child.highWaterMarkOffset.Store(newestOffset)
669
670 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
671 if err != nil {
672 return err
673 }
674
675 switch {
676 case offset == OffsetNewest:
677 child.offset = newestOffset
678 case offset == OffsetOldest:
679 child.offset = oldestOffset
680 case offset >= oldestOffset && offset <= newestOffset:
681 child.offset = offset
682 default:
683 return ErrOffsetOutOfRange
684 }
685
686 return nil
687}
688
689func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
690 return child.messages

Callers 1

ConsumePartitionMethod · 0.95

Calls 1

GetOffsetMethod · 0.65

Tested by

no test coverage detected