MCPcopy
hub / github.com/segmentio/kafka-go / Read

Method Read

batch.go:140–185  ·  view source on GitHub ↗

Read reads the value of the next message from the batch into b, returning the number of bytes read, or an error if the next message couldn't be read. If an error is returned the batch cannot be used anymore and calling Read again will keep returning that error. All errors except io.EOF (indicating

(b []byte)

Source from the content-addressed store, hash-verified

138// The method fails with io.ErrShortBuffer if the buffer passed as argument is
139// too small to hold the message value.
140func (batch *Batch) Read(b []byte) (int, error) {
141 n := 0
142
143 batch.mutex.Lock()
144 offset := batch.offset
145
146 _, _, _, err := batch.readMessage(
147 func(r *bufio.Reader, size int, nbytes int) (int, error) {
148 if nbytes < 0 {
149 return size, nil
150 }
151 return discardN(r, size, nbytes)
152 },
153 func(r *bufio.Reader, size int, nbytes int) (int, error) {
154 if nbytes < 0 {
155 return size, nil
156 }
157 // make sure there are enough bytes for the message value. return
158 // errShortRead if the message is truncated.
159 if nbytes > size {
160 return size, errShortRead
161 }
162 n = nbytes // return value
163 if nbytes > cap(b) {
164 nbytes = cap(b)
165 }
166 if nbytes > len(b) {
167 b = b[:nbytes]
168 }
169 nbytes, err := io.ReadFull(r, b[:nbytes])
170 if err != nil {
171 return size - nbytes, err
172 }
173 return discardN(r, size-nbytes, n-nbytes)
174 },
175 )
176
177 if err == nil && n > len(b) {
178 n, err = len(b), io.ErrShortBuffer
179 batch.err = io.ErrShortBuffer
180 batch.offset = offset // rollback
181 }
182
183 batch.mutex.Unlock()
184 return n, err
185}
186
187// ReadMessage reads and return the next message from the batch.
188//

Callers

nothing calls this directly

Calls 2

readMessageMethod · 0.95
discardNFunction · 0.85

Tested by

no test coverage detected