MCPcopy
hub / github.com/segmentio/kafka-go / writeMessage

Method writeMessage

write.go:521–544  ·  view source on GitHub ↗
(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer)

Source from the content-addressed store, hash-verified

519}
520
521func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
522 const magicByte = 1 // compatible with kafka 0.10.0.0+
523
524 timestamp := timestamp(time)
525 size := messageSize(key, value)
526
527 // dry run to compute the checksum
528 cw.crc32 = 0
529 cw.writeInt8(magicByte)
530 cw.writeInt8(attributes)
531 cw.writeInt64(timestamp)
532 cw.writeBytes(key)
533 cw.writeBytes(value)
534
535 // actual write to the output buffer
536 wb.writeInt64(offset)
537 wb.writeInt32(size)
538 wb.writeInt32(int32(cw.crc32))
539 wb.writeInt8(magicByte)
540 wb.writeInt8(attributes)
541 wb.writeInt64(timestamp)
542 wb.writeBytes(key)
543 wb.writeBytes(value)
544}
545
546// Messages with magic >2 are called records. This method writes messages using message format 2.
547func (wb *writeBuffer) writeRecord(attributes int8, baseTime time.Time, offset int64, msg Message) {

Callers 2

writeProduceRequestV2Method · 0.95
compressMessageSetFunction · 0.95

Calls 6

writeInt8Method · 0.95
writeInt64Method · 0.95
writeBytesMethod · 0.95
writeInt32Method · 0.95
messageSizeFunction · 0.85
timestampFunction · 0.70

Tested by

no test coverage detected