FetchMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation. The method returns io.EOF to indicate that the reader has been closed. Fe
(ctx context.Context)
| 813 | // FetchMessage does not commit offsets automatically when using consumer groups. |
| 814 | // Use CommitMessages to commit the offset. |
| 815 | func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { |
| 816 | r.activateReadLag() |
| 817 | |
| 818 | for { |
| 819 | r.mutex.Lock() |
| 820 | |
| 821 | if !r.closed && r.version == 0 { |
| 822 | r.start(r.getTopicPartitionOffset()) |
| 823 | } |
| 824 | |
| 825 | version := r.version |
| 826 | r.mutex.Unlock() |
| 827 | |
| 828 | select { |
| 829 | case <-ctx.Done(): |
| 830 | return Message{}, ctx.Err() |
| 831 | |
| 832 | case err := <-r.runError: |
| 833 | return Message{}, err |
| 834 | |
| 835 | case m, ok := <-r.msgs: |
| 836 | if !ok { |
| 837 | return Message{}, io.EOF |
| 838 | } |
| 839 | |
| 840 | if m.version >= version { |
| 841 | r.mutex.Lock() |
| 842 | |
| 843 | switch { |
| 844 | case m.error != nil: |
| 845 | case version == r.version: |
| 846 | r.offset = m.message.Offset + 1 |
| 847 | r.lag = m.watermark - r.offset |
| 848 | } |
| 849 | |
| 850 | r.mutex.Unlock() |
| 851 | |
| 852 | if errors.Is(m.error, io.EOF) { |
| 853 | // io.EOF is used as a marker to indicate that the stream |
| 854 | // has been closed, in case it was received from the inner |
| 855 | // reader we don't want to confuse the program and replace |
| 856 | // the error with io.ErrUnexpectedEOF. |
| 857 | m.error = io.ErrUnexpectedEOF |
| 858 | } |
| 859 | |
| 860 | return m.message, m.error |
| 861 | } |
| 862 | } |
| 863 | } |
| 864 | } |
| 865 | |
| 866 | // CommitMessages commits the list of messages passed as argument. The program |
| 867 | // may pass a context to asynchronously cancel the commit operation when it was |