WriteTo writes the representation of rs into w. The value of rs.Version dictates which format that the record set will be represented as. The error will be ErrNoRecord if rs contained no records. Note: since this package is only compatible with kafka 0.10 and above, the method never produces messa
(w io.Writer)
| 241 | // method never produces messages in version 0. If rs.Version is zero, the |
| 242 | // method defaults to producing messages in version 1. |
| 243 | func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) { |
| 244 | if rs.Records == nil { |
| 245 | return 0, ErrNoRecord |
| 246 | } |
| 247 | |
| 248 | // This optimization avoids rendering the record set in an intermediary |
| 249 | // buffer when the writer is already a pageBuffer, which is a common case |
| 250 | // due to the way WriteRequest and WriteResponse are implemented. |
| 251 | buffer, _ := w.(*pageBuffer) |
| 252 | bufferOffset := int64(0) |
| 253 | |
| 254 | if buffer != nil { |
| 255 | bufferOffset = buffer.Size() |
| 256 | } else { |
| 257 | buffer = newPageBuffer() |
| 258 | defer buffer.unref() |
| 259 | } |
| 260 | |
| 261 | size := packUint32(0) |
| 262 | buffer.Write(size[:]) // size placeholder |
| 263 | |
| 264 | var err error |
| 265 | switch rs.Version { |
| 266 | case 0, 1: |
| 267 | err = rs.writeToVersion1(buffer, bufferOffset+4) |
| 268 | case 2: |
| 269 | err = rs.writeToVersion2(buffer, bufferOffset+4) |
| 270 | default: |
| 271 | err = fmt.Errorf("unsupported record set version %d", rs.Version) |
| 272 | } |
| 273 | if err != nil { |
| 274 | return 0, err |
| 275 | } |
| 276 | |
| 277 | n := buffer.Size() - bufferOffset |
| 278 | if n == 0 { |
| 279 | size = packUint32(^uint32(0)) |
| 280 | } else { |
| 281 | size = packUint32(uint32(n) - 4) |
| 282 | } |
| 283 | buffer.WriteAt(size[:], bufferOffset) |
| 284 | |
| 285 | // This condition indicates that the output writer received by `WriteTo` was |
| 286 | // not a *pageBuffer, in which case we need to flush the buffered records |
| 287 | // data into it. |
| 288 | if buffer != w { |
| 289 | return buffer.WriteTo(w) |
| 290 | } |
| 291 | |
| 292 | return n, nil |
| 293 | } |
| 294 | |
| 295 | // RawRecordSet represents a record set for a RawProduce request. The record set is |
| 296 | // represented as a raw sequence of pre-encoded record set bytes. |