MCPcopy
hub / github.com/segmentio/kafka-go / SetOffset

Method SetOffset

reader.go:1025–1051  ·  view source on GitHub ↗

SetOffset changes the offset from which the next batch of messages will be read. The method fails with io.ErrClosedPipe if the reader has already been closed. From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first or last available offset in the partition. Please note whil

(offset int64)

Source from the content-addressed store, hash-verified

1023// were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
1024// specification.
1025func (r *Reader) SetOffset(offset int64) error {
1026 if r.useConsumerGroup() {
1027 return errNotAvailableWithGroup
1028 }
1029
1030 var err error
1031 r.mutex.Lock()
1032
1033 if r.closed {
1034 err = io.ErrClosedPipe
1035 } else if offset != r.offset {
1036 r.withLogger(func(log Logger) {
1037 log.Printf("setting the offset of the kafka reader for partition %d of %s from %s to %s",
1038 r.config.Partition, r.config.Topic, toHumanOffset(r.offset), toHumanOffset(offset))
1039 })
1040 r.offset = offset
1041
1042 if r.version != 0 {
1043 r.start(r.getTopicPartitionOffset())
1044 }
1045
1046 r.activateReadLag()
1047 }
1048
1049 r.mutex.Unlock()
1050 return err
1051}
1052
1053// SetOffsetAt changes the offset from which the next batch of messages will be
1054// read given the timestamp t.

Calls 7

useConsumerGroupMethod · 0.95
withLoggerMethod · 0.95
startMethod · 0.95
activateReadLagMethod · 0.95
toHumanOffsetFunction · 0.85
PrintfMethod · 0.65