| 133 | } |
| 134 | |
| 135 | func (p partitionState) getStartOffset() kgo.Offset { |
| 136 | if p.commitOffset > commitOffsetAtEnd { |
| 137 | return kgo.NewOffset().At(p.commitOffset) |
| 138 | } |
| 139 | // If commit offset is AtEnd (-1), it nevertheless will consume from the start. |
| 140 | // This is a workaround for franz-go and default Kafka behaviour: |
| 141 | // in case consumer is new and has no committed offsets, it will start consuming from the end, |
| 142 | // while for block builder, it should consume from the earliest record. |
| 143 | // The workaround is dirty and can break the consumer if it starts returning AtEnd (-1) for |
| 144 | // already running consumer. |
| 145 | // TODO: replace the workaround with proper new consumer offset initialization |
| 146 | // if p.commitOffset == commitOffsetAtEnd { |
| 147 | // return kgo.NewOffset().AtEnd() |
| 148 | // } |
| 149 | return kgo.NewOffset().AtStart() |
| 150 | } |
| 151 | |
| 152 | func (p partitionState) hasRecords() bool { |
| 153 | return p.endOffset > emptyPartitionEndOffset |