SetOffset changes the offset from which the next batch of messages will be read. The method fails with io.ErrClosedPipe if the reader has already been closed. From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first or last available offset in the partition. Please note whil
(offset int64)
| 1023 | // were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol |
| 1024 | // specification. |
| 1025 | func (r *Reader) SetOffset(offset int64) error { |
| 1026 | if r.useConsumerGroup() { |
| 1027 | return errNotAvailableWithGroup |
| 1028 | } |
| 1029 | |
| 1030 | var err error |
| 1031 | r.mutex.Lock() |
| 1032 | |
| 1033 | if r.closed { |
| 1034 | err = io.ErrClosedPipe |
| 1035 | } else if offset != r.offset { |
| 1036 | r.withLogger(func(log Logger) { |
| 1037 | log.Printf("setting the offset of the kafka reader for partition %d of %s from %s to %s", |
| 1038 | r.config.Partition, r.config.Topic, toHumanOffset(r.offset), toHumanOffset(offset)) |
| 1039 | }) |
| 1040 | r.offset = offset |
| 1041 | |
| 1042 | if r.version != 0 { |
| 1043 | r.start(r.getTopicPartitionOffset()) |
| 1044 | } |
| 1045 | |
| 1046 | r.activateReadLag() |
| 1047 | } |
| 1048 | |
| 1049 | r.mutex.Unlock() |
| 1050 | return err |
| 1051 | } |
| 1052 | |
| 1053 | // SetOffsetAt changes the offset from which the next batch of messages will be |
| 1054 | // read given the timestamp t. |