MCPcopy
hub / github.com/segmentio/kafka-go / bytes

Method bytes

builder_test.go:174–223  ·  builder_test.go::v2MessageSetBuilder.bytes
()

Source from the content-addressed store, hash-verified

172}
173
174func (f v2MessageSetBuilder) bytes() []byte {
175 attributes := int16(0)
176 if f.codec != nil {
177 attributes = int16(f.codec.Code()) // set codec code on attributes
178 }
179 return newWB().call(func(wb *kafkaWriteBuffer) {
180 wb.writeInt64(f.msgs[0].Offset)
181 wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
182 wb.writeInt32(0) // leader epoch
183 wb.writeInt8(2) // magic = 2
184 wb.writeInt32(0) // crc, unused
185 wb.writeInt16(attributes) // record set attributes
186 wb.writeInt32(0) // record set last offset delta
187 wb.writeInt64(1000 * f.msgs[0].Time.Unix()) // record set first timestamp
188 wb.writeInt64(1000 * f.msgs[0].Time.Unix()) // record set last timestamp
189 wb.writeInt64(0) // record set producer id
190 wb.writeInt16(0) // record set producer epoch
191 wb.writeInt32(0) // record set base sequence
192 wb.writeInt32(int32(len(f.msgs))) // record set count
193 bs := newWB().call(func(wb *kafkaWriteBuffer) {
194 for i, msg := range f.msgs {
195 wb.Write(newWB().call(func(wb *kafkaWriteBuffer) {
196 bs := newWB().call(func(wb *kafkaWriteBuffer) {
197 wb.writeInt8(0) // record attributes, not used here
198 wb.writeVarInt(1000 * (time.Now().Unix() - msg.Time.Unix())) // timestamp
199 wb.writeVarInt(int64(i)) // offset delta
200 wb.writeVarInt(int64(len(msg.Key))) // key len
201 wb.Write(msg.Key) // key bytes
202 wb.writeVarInt(int64(len(msg.Value))) // value len
203 wb.Write(msg.Value) // value bytes
204 wb.writeVarInt(int64(len(msg.Headers))) // number of headers
205 for _, header := range msg.Headers {
206 wb.writeVarInt(int64(len(header.Key)))
207 wb.Write([]byte(header.Key))
208 wb.writeVarInt(int64(len(header.Value)))
209 wb.Write(header.Value)
210 }
211 })
212 wb.writeVarInt(int64(len(bs)))
213 wb.Write(bs)
214 }))
215 }
216 })
217 if f.codec != nil {
218 bs = mustCompress(bs, f.codec)
219 }
220 wb.Write(bs)
221 }))
222 })
223}
224
225// kafkaWriteBuffer is a write buffer that helps writing fetch responses.
226type kafkaWriteBuffer struct {

Callers

nothing calls this directly

Calls 11

newWBFunction · 0.85
mustCompressFunction · 0.85
callMethod · 0.80
CodeMethod · 0.65
writeInt64Method · 0.45
writeBytesMethod · 0.45
writeInt32Method · 0.45
writeInt8Method · 0.45
writeInt16Method · 0.45
WriteMethod · 0.45
writeVarIntMethod · 0.45

Tested by

no test coverage detected