(codec CompressionCodec, msgs ...Message)
| 499 | } |
| 500 | |
| 501 | func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int8, size int32, err error) { |
| 502 | compressed = acquireBuffer() |
| 503 | compressor := codec.NewWriter(compressed) |
| 504 | wb := &writeBuffer{w: compressor} |
| 505 | cw := &crc32Writer{table: crc32.IEEETable} |
| 506 | |
| 507 | for offset, msg := range msgs { |
| 508 | wb.writeMessage(int64(offset), 0, msg.Time, msg.Key, msg.Value, cw) |
| 509 | } |
| 510 | |
| 511 | if err = compressor.Close(); err != nil { |
| 512 | releaseBuffer(compressed) |
| 513 | return |
| 514 | } |
| 515 | |
| 516 | attributes = codec.Code() |
| 517 | size = messageSetSize(Message{Value: compressed.Bytes()}) |
| 518 | return |
| 519 | } |
| 520 | |
| 521 | func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) { |
| 522 | const magicByte = 1 // compatible with kafka 0.10.0.0+ |
no test coverage detected