MCPcopy
hub / github.com/segmentio/kafka-go / WriteTo

Method WriteTo

protocol/record.go:243–293  ·  view source on GitHub ↗

WriteTo writes the representation of rs into w. The value of rs.Version dictates which format that the record set will be represented as. The error will be ErrNoRecord if rs contained no records. Note: since this package is only compatible with kafka 0.10 and above, the method never produces messa

(w io.Writer)

Source from the content-addressed store, hash-verified

241// method never produces messages in version 0. If rs.Version is zero, the
242// method defaults to producing messages in version 1.
243func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) {
244 if rs.Records == nil {
245 return 0, ErrNoRecord
246 }
247
248 // This optimization avoids rendering the record set in an intermediary
249 // buffer when the writer is already a pageBuffer, which is a common case
250 // due to the way WriteRequest and WriteResponse are implemented.
251 buffer, _ := w.(*pageBuffer)
252 bufferOffset := int64(0)
253
254 if buffer != nil {
255 bufferOffset = buffer.Size()
256 } else {
257 buffer = newPageBuffer()
258 defer buffer.unref()
259 }
260
261 size := packUint32(0)
262 buffer.Write(size[:]) // size placeholder
263
264 var err error
265 switch rs.Version {
266 case 0, 1:
267 err = rs.writeToVersion1(buffer, bufferOffset+4)
268 case 2:
269 err = rs.writeToVersion2(buffer, bufferOffset+4)
270 default:
271 err = fmt.Errorf("unsupported record set version %d", rs.Version)
272 }
273 if err != nil {
274 return 0, err
275 }
276
277 n := buffer.Size() - bufferOffset
278 if n == 0 {
279 size = packUint32(^uint32(0))
280 } else {
281 size = packUint32(uint32(n) - 4)
282 }
283 buffer.WriteAt(size[:], bufferOffset)
284
285 // This condition indicates that the output writer received by `WriteTo` was
286 // not a *pageBuffer, in which case we need to flush the buffered records
287 // data into it.
288 if buffer != w {
289 return buffer.WriteTo(w)
290 }
291
292 return n, nil
293}
294
295// RawRecordSet represents a record set for a RawProduce request. The record set is
296// represented as a raw sequence of pre-encoded record set bytes.

Callers 3

NewRawRecordSetFunction · 0.95
ReadFromMethod · 0.95
NewRawRecordSetFunction · 0.95

Calls 8

writeToVersion1Method · 0.95
writeToVersion2Method · 0.95
newPageBufferFunction · 0.85
packUint32Function · 0.85
SizeMethod · 0.45
unrefMethod · 0.45
WriteMethod · 0.45
WriteAtMethod · 0.45

Tested by 2

NewRawRecordSetFunction · 0.76
NewRawRecordSetFunction · 0.76