()
| 172 | } |
| 173 | |
| 174 | func (f v2MessageSetBuilder) bytes() []byte { |
| 175 | attributes := int16(0) |
| 176 | if f.codec != nil { |
| 177 | attributes = int16(f.codec.Code()) // set codec code on attributes |
| 178 | } |
| 179 | return newWB().call(func(wb *kafkaWriteBuffer) { |
| 180 | wb.writeInt64(f.msgs[0].Offset) |
| 181 | wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) { |
| 182 | wb.writeInt32(0) // leader epoch |
| 183 | wb.writeInt8(2) // magic = 2 |
| 184 | wb.writeInt32(0) // crc, unused |
| 185 | wb.writeInt16(attributes) // record set attributes |
| 186 | wb.writeInt32(0) // record set last offset delta |
| 187 | wb.writeInt64(1000 * f.msgs[0].Time.Unix()) // record set first timestamp |
| 188 | wb.writeInt64(1000 * f.msgs[0].Time.Unix()) // record set last timestamp |
| 189 | wb.writeInt64(0) // record set producer id |
| 190 | wb.writeInt16(0) // record set producer epoch |
| 191 | wb.writeInt32(0) // record set base sequence |
| 192 | wb.writeInt32(int32(len(f.msgs))) // record set count |
| 193 | bs := newWB().call(func(wb *kafkaWriteBuffer) { |
| 194 | for i, msg := range f.msgs { |
| 195 | wb.Write(newWB().call(func(wb *kafkaWriteBuffer) { |
| 196 | bs := newWB().call(func(wb *kafkaWriteBuffer) { |
| 197 | wb.writeInt8(0) // record attributes, not used here |
| 198 | wb.writeVarInt(1000 * (time.Now().Unix() - msg.Time.Unix())) // timestamp |
| 199 | wb.writeVarInt(int64(i)) // offset delta |
| 200 | wb.writeVarInt(int64(len(msg.Key))) // key len |
| 201 | wb.Write(msg.Key) // key bytes |
| 202 | wb.writeVarInt(int64(len(msg.Value))) // value len |
| 203 | wb.Write(msg.Value) // value bytes |
| 204 | wb.writeVarInt(int64(len(msg.Headers))) // number of headers |
| 205 | for _, header := range msg.Headers { |
| 206 | wb.writeVarInt(int64(len(header.Key))) |
| 207 | wb.Write([]byte(header.Key)) |
| 208 | wb.writeVarInt(int64(len(header.Value))) |
| 209 | wb.Write(header.Value) |
| 210 | } |
| 211 | }) |
| 212 | wb.writeVarInt(int64(len(bs))) |
| 213 | wb.Write(bs) |
| 214 | })) |
| 215 | } |
| 216 | }) |
| 217 | if f.codec != nil { |
| 218 | bs = mustCompress(bs, f.codec) |
| 219 | } |
| 220 | wb.Write(bs) |
| 221 | })) |
| 222 | }) |
| 223 | } |
| 224 | |
| 225 | // kafkaWriteBuffer is a write buffer that helps writing fetch responses. |
| 226 | type kafkaWriteBuffer struct { |
nothing calls this directly
no test coverage detected