(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer)
| 519 | } |
| 520 | |
| 521 | func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) { |
| 522 | const magicByte = 1 // compatible with kafka 0.10.0.0+ |
| 523 | |
| 524 | timestamp := timestamp(time) |
| 525 | size := messageSize(key, value) |
| 526 | |
| 527 | // dry run to compute the checksum |
| 528 | cw.crc32 = 0 |
| 529 | cw.writeInt8(magicByte) |
| 530 | cw.writeInt8(attributes) |
| 531 | cw.writeInt64(timestamp) |
| 532 | cw.writeBytes(key) |
| 533 | cw.writeBytes(value) |
| 534 | |
| 535 | // actual write to the output buffer |
| 536 | wb.writeInt64(offset) |
| 537 | wb.writeInt32(size) |
| 538 | wb.writeInt32(int32(cw.crc32)) |
| 539 | wb.writeInt8(magicByte) |
| 540 | wb.writeInt8(attributes) |
| 541 | wb.writeInt64(timestamp) |
| 542 | wb.writeBytes(key) |
| 543 | wb.writeBytes(value) |
| 544 | } |
| 545 | |
| 546 | // Messages with magic >2 are called records. This method writes messages using message format 2. |
| 547 | func (wb *writeBuffer) writeRecord(attributes int8, baseTime time.Time, offset int64, msg Message) { |
no test coverage detected