()
| 84 | } |
| 85 | |
| 86 | func (f v0MessageSetBuilder) bytes() []byte { |
| 87 | bs := newWB().call(func(wb *kafkaWriteBuffer) { |
| 88 | for _, msg := range f.msgs { |
| 89 | bs := newWB().call(func(wb *kafkaWriteBuffer) { |
| 90 | wb.writeInt64(msg.Offset) // offset |
| 91 | wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) { |
| 92 | wb.writeInt32(-1) // crc, unused |
| 93 | wb.writeInt8(0) // magic |
| 94 | wb.writeInt8(0) // attributes -- zero, no compression for the inner message |
| 95 | wb.writeBytes(msg.Key) |
| 96 | wb.writeBytes(msg.Value) |
| 97 | })) |
| 98 | }) |
| 99 | wb.Write(bs) |
| 100 | } |
| 101 | }) |
| 102 | if f.codec != nil { |
| 103 | bs = newWB().call(func(wb *kafkaWriteBuffer) { |
| 104 | wb.writeInt64(f.msgs[0].Offset) // offset |
| 105 | wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) { |
| 106 | compressed := mustCompress(bs, f.codec) |
| 107 | wb.writeInt32(-1) // crc, unused |
| 108 | wb.writeInt8(0) // magic |
| 109 | wb.writeInt8(f.codec.Code()) // attributes |
| 110 | wb.writeBytes(nil) // key is always nil for compressed |
| 111 | wb.writeBytes(compressed) // the value is the compressed message |
| 112 | })) |
| 113 | }) |
| 114 | } |
| 115 | return bs |
| 116 | } |
| 117 | |
| 118 | type v1MessageSetBuilder struct { |
| 119 | msgs []Message |
nothing calls this directly
no test coverage detected