Write writes given, non-empty, protobuf message to a remote storage. Depending on serialization methods, - https://github.com/planetscale/vtprotobuf methods will be used if your msg supports those (e.g. SizeVT() and MarshalToSizedBufferVT(...)), for efficiency - Otherwise https://github.com/gogo/pr
(ctx context.Context, msgType WriteMessageType, msg any)
| 181 | // - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and |
| 182 | // error out on unknown scheme. |
| 183 | func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ WriteResponseStats, err error) { |
| 184 | buf := r.bufPool.Get().(*[]byte) |
| 185 | |
| 186 | if err := msgType.Validate(); err != nil { |
| 187 | return WriteResponseStats{}, err |
| 188 | } |
| 189 | |
| 190 | // Encode the payload. |
| 191 | switch m := msg.(type) { |
| 192 | case vtProtoEnabled: |
| 193 | // Use optimized vtprotobuf if supported. |
| 194 | size := m.SizeVT() |
| 195 | if cap(*buf) < size { |
| 196 | *buf = make([]byte, size) |
| 197 | } else { |
| 198 | *buf = (*buf)[:size] |
| 199 | } |
| 200 | |
| 201 | if _, err := m.MarshalToSizedBufferVT(*buf); err != nil { |
| 202 | return WriteResponseStats{}, fmt.Errorf("encoding request %w", err) |
| 203 | } |
| 204 | case gogoProtoEnabled: |
| 205 | // Gogo proto if supported. |
| 206 | size := m.Size() |
| 207 | if cap(*buf) < size { |
| 208 | *buf = make([]byte, size) |
| 209 | } else { |
| 210 | *buf = (*buf)[:size] |
| 211 | } |
| 212 | |
| 213 | if _, err := m.MarshalToSizedBuffer(*buf); err != nil { |
| 214 | return WriteResponseStats{}, fmt.Errorf("encoding request %w", err) |
| 215 | } |
| 216 | case proto.Message: |
| 217 | // Generic proto. |
| 218 | *buf, err = (proto.MarshalOptions{}).MarshalAppend(*buf, m) |
| 219 | if err != nil { |
| 220 | return WriteResponseStats{}, fmt.Errorf("encoding request %w", err) |
| 221 | } |
| 222 | default: |
| 223 | return WriteResponseStats{}, fmt.Errorf("unknown message type %T", m) |
| 224 | } |
| 225 | |
| 226 | comprBuf := r.bufPool.Get().(*[]byte) |
| 227 | payload, err := compressPayload(comprBuf, r.opts.compression, *buf) |
| 228 | if err != nil { |
| 229 | return WriteResponseStats{}, fmt.Errorf("compressing %w", err) |
| 230 | } |
| 231 | r.bufPool.Put(buf) |
| 232 | r.bufPool.Put(comprBuf) |
| 233 | |
| 234 | // Since we retry writes we need to track the total amount of accepted data |
| 235 | // across the various attempts. |
| 236 | accumulatedStats := WriteResponseStats{} |
| 237 | |
| 238 | b := backoff.New(ctx, r.opts.backoff) |
| 239 | for { |
| 240 | rs, err := r.attemptWrite(ctx, r.opts.compression, msgType, payload, b.NumRetries()) |