(buffer *pageBuffer, bufferOffset int64)
| 150 | } |
| 151 | |
| 152 | func (rs *RecordSet) writeToVersion1(buffer *pageBuffer, bufferOffset int64) error { |
| 153 | attributes := rs.Attributes |
| 154 | records := rs.Records |
| 155 | |
| 156 | if compression := attributes.Compression(); compression != 0 { |
| 157 | if codec := compression.Codec(); codec != nil { |
| 158 | // In the message format version 1, compression is achieved by |
| 159 | // compressing the value of a message which recursively contains |
| 160 | // the representation of the compressed message set. |
| 161 | subset := *rs |
| 162 | subset.Attributes &= ^7 // erase compression |
| 163 | |
| 164 | if err := subset.writeToVersion1(buffer, bufferOffset); err != nil { |
| 165 | return err |
| 166 | } |
| 167 | |
| 168 | compressed := newPageBuffer() |
| 169 | defer compressed.unref() |
| 170 | |
| 171 | compressor := codec.NewWriter(compressed) |
| 172 | defer compressor.Close() |
| 173 | |
| 174 | var err error |
| 175 | buffer.pages.scan(bufferOffset, buffer.Size(), func(b []byte) bool { |
| 176 | _, err = compressor.Write(b) |
| 177 | return err == nil |
| 178 | }) |
| 179 | if err != nil { |
| 180 | return err |
| 181 | } |
| 182 | if err := compressor.Close(); err != nil { |
| 183 | return err |
| 184 | } |
| 185 | |
| 186 | buffer.Truncate(int(bufferOffset)) |
| 187 | |
| 188 | records = &message{ |
| 189 | Record: Record{ |
| 190 | Value: compressed, |
| 191 | }, |
| 192 | } |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | e := encoder{writer: buffer} |
| 197 | currentTimestamp := timestamp(time.Now()) |
| 198 | |
| 199 | return forEachRecord(records, func(i int, r *Record) error { |
| 200 | t := timestamp(r.Time) |
| 201 | if t == 0 { |
| 202 | t = currentTimestamp |
| 203 | } |
| 204 | |
| 205 | messageOffset := buffer.Size() |
| 206 | e.writeInt64(int64(i)) |
| 207 | e.writeInt32(0) // message size placeholder |
| 208 | e.writeInt32(0) // crc32 placeholder |
| 209 | e.setCRC(crc32.IEEETable) |
no test coverage detected