MCPcopy
hub / github.com/segmentio/kafka-go / bytes

Method bytes

builder_test.go:127–163  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

125}
126
127func (f v1MessageSetBuilder) bytes() []byte {
128 bs := newWB().call(func(wb *kafkaWriteBuffer) {
129 for i, msg := range f.msgs {
130 bs := newWB().call(func(wb *kafkaWriteBuffer) {
131 if f.codec != nil {
132 wb.writeInt64(int64(i)) // compressed inner message offsets are relative
133 } else {
134 wb.writeInt64(msg.Offset) // offset
135 }
136 wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
137 wb.writeInt32(-1) // crc, unused
138 wb.writeInt8(1) // magic
139 wb.writeInt8(0) // attributes -- zero, no compression for the inner message
140 wb.writeInt64(1000 * msg.Time.Unix()) // timestamp
141 wb.writeBytes(msg.Key)
142 wb.writeBytes(msg.Value)
143 }))
144 })
145 wb.Write(bs)
146 }
147 })
148 if f.codec != nil {
149 bs = newWB().call(func(wb *kafkaWriteBuffer) {
150 wb.writeInt64(f.msgs[len(f.msgs)-1].Offset) // offset of the wrapper message is the last offset of the inner messages
151 wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
152 bs := mustCompress(bs, f.codec)
153 wb.writeInt32(-1) // crc, unused
154 wb.writeInt8(1) // magic
155 wb.writeInt8(f.codec.Code()) // attributes
156 wb.writeInt64(1000 * f.msgs[0].Time.Unix()) // timestamp
157 wb.writeBytes(nil) // key is always nil for compressed
158 wb.writeBytes(bs) // the value is the compressed message
159 }))
160 })
161 }
162 return bs
163}
164
165type v2MessageSetBuilder struct {
166 msgs []Message

Callers

nothing calls this directly

Calls 9

newWBFunction · 0.85
mustCompressFunction · 0.85
callMethod · 0.80
CodeMethod · 0.65
writeInt64Method · 0.45
writeBytesMethod · 0.45
writeInt32Method · 0.45
writeInt8Method · 0.45
WriteMethod · 0.45

Tested by

no test coverage detected