MCPcopy
hub / github.com/segmentio/kafka-go / readMessageErr

Method readMessageErr

message_test.go:730–749  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

728}
729
730func (r *readerHelper) readMessageErr() (msg Message, err error) {
731 keyFunc := func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
732 msg.Key, remain, err = readNewBytes(r, size, nbytes)
733 return
734 }
735 valueFunc := func(r *bufio.Reader, size int, nbytes int) (remain int, err error) {
736 msg.Value, remain, err = readNewBytes(r, size, nbytes)
737 return
738 }
739 var timestamp int64
740 var headers []Header
741 r.offset, _, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc)
742 if err != nil {
743 return
744 }
745 msg.Offset = r.offset
746 msg.Time = time.Unix(timestamp/1000, (timestamp%1000)*1000000)
747 msg.Headers = headers
748 return
749}
750
751func (r *readerHelper) readMessage() (msg Message) {
752 var err error

Callers 3

readMessageMethod · 0.95
TestV1BatchOffsetsFunction · 0.80
TestMessageSetReaderFunction · 0.80

Calls 2

readNewBytesFunction · 0.85
readMessageMethod · 0.45

Tested by

no test coverage detected