MCPcopy
hub / github.com/segmentio/kafka-go / compressMessageSet

Function compressMessageSet

write.go:501–519  ·  view source on GitHub ↗
(codec CompressionCodec, msgs ...Message)

Source from the content-addressed store, hash-verified

499}
500
501func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int8, size int32, err error) {
502 compressed = acquireBuffer()
503 compressor := codec.NewWriter(compressed)
504 wb := &writeBuffer{w: compressor}
505 cw := &crc32Writer{table: crc32.IEEETable}
506
507 for offset, msg := range msgs {
508 wb.writeMessage(int64(offset), 0, msg.Time, msg.Key, msg.Value, cw)
509 }
510
511 if err = compressor.Close(); err != nil {
512 releaseBuffer(compressed)
513 return
514 }
515
516 attributes = codec.Code()
517 size = messageSetSize(Message{Value: compressed.Bytes()})
518 return
519}
520
521func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
522 const magicByte = 1 // compatible with kafka 0.10.0.0+

Callers 1

writeProduceRequestV2Method · 0.85

Calls 8

writeMessageMethod · 0.95
CloseMethod · 0.95
acquireBufferFunction · 0.85
releaseBufferFunction · 0.85
messageSetSizeFunction · 0.85
NewWriterMethod · 0.65
CodeMethod · 0.65
BytesMethod · 0.65

Tested by

no test coverage detected