MCPcopy
hub / github.com/segmentio/kafka-go / writeToVersion2

Method writeToVersion2

protocol/record_v2.go:191–315  ·  view source on GitHub ↗
(buffer *pageBuffer, bufferOffset int64)

Source from the content-addressed store, hash-verified

189}
190
191func (rs *RecordSet) writeToVersion2(buffer *pageBuffer, bufferOffset int64) error {
192 records := rs.Records
193 numRecords := int32(0)
194
195 e := &encoder{writer: buffer}
196 e.writeInt64(0) // base offset | 0 +8
197 e.writeInt32(0) // placeholder for record batch length | 8 +4
198 e.writeInt32(-1) // partition leader epoch | 12 +3
199 e.writeInt8(2) // magic byte | 16 +1
200 e.writeInt32(0) // placeholder for crc32 checksum | 17 +4
201 e.writeInt16(int16(rs.Attributes)) // attributes | 21 +2
202 e.writeInt32(0) // placeholder for lastOffsetDelta | 23 +4
203 e.writeInt64(0) // placeholder for firstTimestamp | 27 +8
204 e.writeInt64(0) // placeholder for maxTimestamp | 35 +8
205 e.writeInt64(-1) // producer id | 43 +8
206 e.writeInt16(-1) // producer epoch | 51 +2
207 e.writeInt32(-1) // base sequence | 53 +4
208 e.writeInt32(0) // placeholder for numRecords | 57 +4
209
210 var compressor io.WriteCloser
211 if compression := rs.Attributes.Compression(); compression != 0 {
212 if codec := compression.Codec(); codec != nil {
213 compressor = codec.NewWriter(buffer)
214 e.writer = compressor
215 }
216 }
217
218 currentTimestamp := timestamp(time.Now())
219 lastOffsetDelta := int32(0)
220 firstTimestamp := int64(0)
221 maxTimestamp := int64(0)
222
223 err := forEachRecord(records, func(i int, r *Record) error {
224 t := timestamp(r.Time)
225 if t == 0 {
226 t = currentTimestamp
227 }
228 if i == 0 {
229 firstTimestamp = t
230 }
231 if t > maxTimestamp {
232 maxTimestamp = t
233 }
234
235 timestampDelta := t - firstTimestamp
236 offsetDelta := int64(i)
237 lastOffsetDelta = int32(offsetDelta)
238
239 length := 1 + // attributes
240 sizeOfVarInt(timestampDelta) +
241 sizeOfVarInt(offsetDelta) +
242 sizeOfVarNullBytesIface(r.Key) +
243 sizeOfVarNullBytesIface(r.Value) +
244 sizeOfVarInt(int64(len(r.Headers)))
245
246 for _, h := range r.Headers {
247 length += sizeOfVarString(h.Key) + sizeOfVarNullBytes(h.Value)
248 }

Callers 1

WriteToMethod · 0.95

Calls 15

writeInt64Method · 0.95
writeInt32Method · 0.95
writeInt8Method · 0.95
writeInt16Method · 0.95
writeVarIntMethod · 0.95
writeVarNullBytesFromMethod · 0.95
writeVarStringMethod · 0.95
writeVarNullBytesMethod · 0.95
forEachRecordFunction · 0.85
sizeOfVarIntFunction · 0.85
sizeOfVarNullBytesIfaceFunction · 0.85
sizeOfVarStringFunction · 0.85

Tested by

no test coverage detected