MCPcopy
hub / github.com/segmentio/kafka-go / FetchMessage

Method FetchMessage

reader.go:815–864  ·  view source on GitHub ↗

FetchMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation. The method returns io.EOF to indicate that the reader has been closed. Fe

(ctx context.Context)

Source from the content-addressed store, hash-verified

813// FetchMessage does not commit offsets automatically when using consumer groups.
814// Use CommitMessages to commit the offset.
815func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
816 r.activateReadLag()
817
818 for {
819 r.mutex.Lock()
820
821 if !r.closed && r.version == 0 {
822 r.start(r.getTopicPartitionOffset())
823 }
824
825 version := r.version
826 r.mutex.Unlock()
827
828 select {
829 case <-ctx.Done():
830 return Message{}, ctx.Err()
831
832 case err := <-r.runError:
833 return Message{}, err
834
835 case m, ok := <-r.msgs:
836 if !ok {
837 return Message{}, io.EOF
838 }
839
840 if m.version >= version {
841 r.mutex.Lock()
842
843 switch {
844 case m.error != nil:
845 case version == r.version:
846 r.offset = m.message.Offset + 1
847 r.lag = m.watermark - r.offset
848 }
849
850 r.mutex.Unlock()
851
852 if errors.Is(m.error, io.EOF) {
853 // io.EOF is used as a marker to indicate that the stream
854 // has been closed, in case it was received from the inner
855 // reader we don't want to confuse the program and replace
856 // the error with io.ErrUnexpectedEOF.
857 m.error = io.ErrUnexpectedEOF
858 }
859
860 return m.message, m.error
861 }
862 }
863 }
864}
865
866// CommitMessages commits the list of messages passed as argument. The program
867// may pass a context to asynchronously cancel the commit operation when it was

Calls 5

activateReadLagMethod · 0.95
startMethod · 0.95
DoneMethod · 0.80
ErrMethod · 0.45