(offset int64)
| 660 | } |
| 661 | |
| 662 | func (child *partitionConsumer) chooseStartingOffset(offset int64) error { |
| 663 | newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest) |
| 664 | if err != nil { |
| 665 | return err |
| 666 | } |
| 667 | |
| 668 | child.highWaterMarkOffset.Store(newestOffset) |
| 669 | |
| 670 | oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) |
| 671 | if err != nil { |
| 672 | return err |
| 673 | } |
| 674 | |
| 675 | switch { |
| 676 | case offset == OffsetNewest: |
| 677 | child.offset = newestOffset |
| 678 | case offset == OffsetOldest: |
| 679 | child.offset = oldestOffset |
| 680 | case offset >= oldestOffset && offset <= newestOffset: |
| 681 | child.offset = offset |
| 682 | default: |
| 683 | return ErrOffsetOutOfRange |
| 684 | } |
| 685 | |
| 686 | return nil |
| 687 | } |
| 688 | |
| 689 | func (child *partitionConsumer) Messages() <-chan *ConsumerMessage { |
| 690 | return child.messages |
no test coverage detected