| 80 | } |
| 81 | |
| 82 | func (m *Message) encode(pe packetEncoder) error { |
| 83 | pe.push(newCRC32Field(crcIEEE)) |
| 84 | |
| 85 | pe.putInt8(m.Version) |
| 86 | |
| 87 | attributes := int8(m.Codec) & compressionCodecMask |
| 88 | if m.LogAppendTime { |
| 89 | attributes |= timestampTypeMask |
| 90 | } |
| 91 | pe.putInt8(attributes) |
| 92 | |
| 93 | if m.Version >= 1 { |
| 94 | if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil { |
| 95 | return err |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | err := pe.putBytes(m.Key) |
| 100 | if err != nil { |
| 101 | return err |
| 102 | } |
| 103 | |
| 104 | var payload []byte |
| 105 | |
| 106 | if m.compressedCache != nil { |
| 107 | payload = m.compressedCache |
| 108 | m.compressedCache = nil |
| 109 | } else if m.Value != nil { |
| 110 | payload, err = compress(m.Codec, m.CompressionLevel, m.Value) |
| 111 | if err != nil { |
| 112 | return err |
| 113 | } |
| 114 | m.compressedCache = payload |
| 115 | // Keep in mind the compressed payload size for metric gathering |
| 116 | m.compressedSize = len(payload) |
| 117 | } |
| 118 | |
| 119 | if err = pe.putBytes(payload); err != nil { |
| 120 | return err |
| 121 | } |
| 122 | |
| 123 | return pe.pop() |
| 124 | } |
| 125 | |
| 126 | func (m *Message) decode(pd packetDecoder) (err error) { |
| 127 | crc32Decoder := acquireCrc32Field(crcIEEE) |