MCPcopy
hub / github.com/segmentio/kafka-go / writeToVersion1

Method writeToVersion1

protocol/record_v1.go:152–230  ·  view source on GitHub ↗
(buffer *pageBuffer, bufferOffset int64)

Source from the content-addressed store, hash-verified

150}
151
152func (rs *RecordSet) writeToVersion1(buffer *pageBuffer, bufferOffset int64) error {
153 attributes := rs.Attributes
154 records := rs.Records
155
156 if compression := attributes.Compression(); compression != 0 {
157 if codec := compression.Codec(); codec != nil {
158 // In the message format version 1, compression is achieved by
159 // compressing the value of a message which recursively contains
160 // the representation of the compressed message set.
161 subset := *rs
162 subset.Attributes &= ^7 // erase compression
163
164 if err := subset.writeToVersion1(buffer, bufferOffset); err != nil {
165 return err
166 }
167
168 compressed := newPageBuffer()
169 defer compressed.unref()
170
171 compressor := codec.NewWriter(compressed)
172 defer compressor.Close()
173
174 var err error
175 buffer.pages.scan(bufferOffset, buffer.Size(), func(b []byte) bool {
176 _, err = compressor.Write(b)
177 return err == nil
178 })
179 if err != nil {
180 return err
181 }
182 if err := compressor.Close(); err != nil {
183 return err
184 }
185
186 buffer.Truncate(int(bufferOffset))
187
188 records = &message{
189 Record: Record{
190 Value: compressed,
191 },
192 }
193 }
194 }
195
196 e := encoder{writer: buffer}
197 currentTimestamp := timestamp(time.Now())
198
199 return forEachRecord(records, func(i int, r *Record) error {
200 t := timestamp(r.Time)
201 if t == 0 {
202 t = currentTimestamp
203 }
204
205 messageOffset := buffer.Size()
206 e.writeInt64(int64(i))
207 e.writeInt32(0) // message size placeholder
208 e.writeInt32(0) // crc32 placeholder
209 e.setCRC(crc32.IEEETable)

Callers 1

WriteToMethod · 0.95

Calls 15

CloseMethod · 0.95
writeInt64Method · 0.95
writeInt32Method · 0.95
setCRCMethod · 0.95
writeInt8Method · 0.95
writeNullBytesFromMethod · 0.95
newPageBufferFunction · 0.85
forEachRecordFunction · 0.85
packUint32Function · 0.85
CompressionMethod · 0.80
CodecMethod · 0.80
timestampFunction · 0.70

Tested by

no test coverage detected