()
| 125 | } |
| 126 | |
| 127 | func (f v1MessageSetBuilder) bytes() []byte { |
| 128 | bs := newWB().call(func(wb *kafkaWriteBuffer) { |
| 129 | for i, msg := range f.msgs { |
| 130 | bs := newWB().call(func(wb *kafkaWriteBuffer) { |
| 131 | if f.codec != nil { |
| 132 | wb.writeInt64(int64(i)) // compressed inner message offsets are relative |
| 133 | } else { |
| 134 | wb.writeInt64(msg.Offset) // offset |
| 135 | } |
| 136 | wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) { |
| 137 | wb.writeInt32(-1) // crc, unused |
| 138 | wb.writeInt8(1) // magic |
| 139 | wb.writeInt8(0) // attributes -- zero, no compression for the inner message |
| 140 | wb.writeInt64(1000 * msg.Time.Unix()) // timestamp |
| 141 | wb.writeBytes(msg.Key) |
| 142 | wb.writeBytes(msg.Value) |
| 143 | })) |
| 144 | }) |
| 145 | wb.Write(bs) |
| 146 | } |
| 147 | }) |
| 148 | if f.codec != nil { |
| 149 | bs = newWB().call(func(wb *kafkaWriteBuffer) { |
| 150 | wb.writeInt64(f.msgs[len(f.msgs)-1].Offset) // offset of the wrapper message is the last offset of the inner messages |
| 151 | wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) { |
| 152 | bs := mustCompress(bs, f.codec) |
| 153 | wb.writeInt32(-1) // crc, unused |
| 154 | wb.writeInt8(1) // magic |
| 155 | wb.writeInt8(f.codec.Code()) // attributes |
| 156 | wb.writeInt64(1000 * f.msgs[0].Time.Unix()) // timestamp |
| 157 | wb.writeBytes(nil) // key is always nil for compressed |
| 158 | wb.writeBytes(bs) // the value is the compressed message |
| 159 | })) |
| 160 | }) |
| 161 | } |
| 162 | return bs |
| 163 | } |
| 164 | |
| 165 | type v2MessageSetBuilder struct { |
| 166 | msgs []Message |
nothing calls this directly
no test coverage detected