(codec CompressionCodec, msgs ...Message)
| 34 | } |
| 35 | |
| 36 | func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) { |
| 37 | compressed = acquireBuffer() |
| 38 | compressor := codec.NewWriter(compressed) |
| 39 | wb := &writeBuffer{w: compressor} |
| 40 | |
| 41 | for i, msg := range msgs { |
| 42 | wb.writeRecord(0, msgs[0].Time, int64(i), msg) |
| 43 | } |
| 44 | |
| 45 | if err = compressor.Close(); err != nil { |
| 46 | releaseBuffer(compressed) |
| 47 | return |
| 48 | } |
| 49 | |
| 50 | attributes = int16(codec.Code()) |
| 51 | size = recordBatchHeaderSize + int32(compressed.Len()) |
| 52 | return |
| 53 | } |
| 54 | |
| 55 | type recordBatch struct { |
| 56 | // required input parameters |
no test coverage detected