(buffer *pageBuffer, bufferOffset int64)
| 189 | } |
| 190 | |
| 191 | func (rs *RecordSet) writeToVersion2(buffer *pageBuffer, bufferOffset int64) error { |
| 192 | records := rs.Records |
| 193 | numRecords := int32(0) |
| 194 | |
| 195 | e := &encoder{writer: buffer} |
| 196 | e.writeInt64(0) // base offset | 0 +8 |
| 197 | e.writeInt32(0) // placeholder for record batch length | 8 +4 |
| 198 | e.writeInt32(-1) // partition leader epoch | 12 +3 |
| 199 | e.writeInt8(2) // magic byte | 16 +1 |
| 200 | e.writeInt32(0) // placeholder for crc32 checksum | 17 +4 |
| 201 | e.writeInt16(int16(rs.Attributes)) // attributes | 21 +2 |
| 202 | e.writeInt32(0) // placeholder for lastOffsetDelta | 23 +4 |
| 203 | e.writeInt64(0) // placeholder for firstTimestamp | 27 +8 |
| 204 | e.writeInt64(0) // placeholder for maxTimestamp | 35 +8 |
| 205 | e.writeInt64(-1) // producer id | 43 +8 |
| 206 | e.writeInt16(-1) // producer epoch | 51 +2 |
| 207 | e.writeInt32(-1) // base sequence | 53 +4 |
| 208 | e.writeInt32(0) // placeholder for numRecords | 57 +4 |
| 209 | |
| 210 | var compressor io.WriteCloser |
| 211 | if compression := rs.Attributes.Compression(); compression != 0 { |
| 212 | if codec := compression.Codec(); codec != nil { |
| 213 | compressor = codec.NewWriter(buffer) |
| 214 | e.writer = compressor |
| 215 | } |
| 216 | } |
| 217 | |
| 218 | currentTimestamp := timestamp(time.Now()) |
| 219 | lastOffsetDelta := int32(0) |
| 220 | firstTimestamp := int64(0) |
| 221 | maxTimestamp := int64(0) |
| 222 | |
| 223 | err := forEachRecord(records, func(i int, r *Record) error { |
| 224 | t := timestamp(r.Time) |
| 225 | if t == 0 { |
| 226 | t = currentTimestamp |
| 227 | } |
| 228 | if i == 0 { |
| 229 | firstTimestamp = t |
| 230 | } |
| 231 | if t > maxTimestamp { |
| 232 | maxTimestamp = t |
| 233 | } |
| 234 | |
| 235 | timestampDelta := t - firstTimestamp |
| 236 | offsetDelta := int64(i) |
| 237 | lastOffsetDelta = int32(offsetDelta) |
| 238 | |
| 239 | length := 1 + // attributes |
| 240 | sizeOfVarInt(timestampDelta) + |
| 241 | sizeOfVarInt(offsetDelta) + |
| 242 | sizeOfVarNullBytesIface(r.Key) + |
| 243 | sizeOfVarNullBytesIface(r.Value) + |
| 244 | sizeOfVarInt(int64(len(r.Headers))) |
| 245 | |
| 246 | for _, h := range r.Headers { |
| 247 | length += sizeOfVarString(h.Key) + sizeOfVarNullBytes(h.Value) |
| 248 | } |
no test coverage detected