()
| 728 | } |
| 729 | |
| 730 | func (r *readerHelper) readMessageErr() (msg Message, err error) { |
| 731 | keyFunc := func(r *bufio.Reader, size int, nbytes int) (remain int, err error) { |
| 732 | msg.Key, remain, err = readNewBytes(r, size, nbytes) |
| 733 | return |
| 734 | } |
| 735 | valueFunc := func(r *bufio.Reader, size int, nbytes int) (remain int, err error) { |
| 736 | msg.Value, remain, err = readNewBytes(r, size, nbytes) |
| 737 | return |
| 738 | } |
| 739 | var timestamp int64 |
| 740 | var headers []Header |
| 741 | r.offset, _, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc) |
| 742 | if err != nil { |
| 743 | return |
| 744 | } |
| 745 | msg.Offset = r.offset |
| 746 | msg.Time = time.Unix(timestamp/1000, (timestamp%1000)*1000000) |
| 747 | msg.Headers = headers |
| 748 | return |
| 749 | } |
| 750 | |
| 751 | func (r *readerHelper) readMessage() (msg Message) { |
| 752 | var err error |
no test coverage detected