MCPcopy
hub / github.com/prometheus/client_golang / Write

Method Write

exp/api/remote/remote_api.go:183–280  ·  view source on GitHub ↗

Write writes given, non-empty, protobuf message to a remote storage. Depending on serialization methods, - https://github.com/planetscale/vtprotobuf methods will be used if your msg supports those (e.g. SizeVT() and MarshalToSizedBufferVT(...)), for efficiency - Otherwise https://github.com/gogo/pr

(ctx context.Context, msgType WriteMessageType, msg any)

Source from the content-addressed store, hash-verified

181// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
182// error out on unknown scheme.
183func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ WriteResponseStats, err error) {
184 buf := r.bufPool.Get().(*[]byte)
185
186 if err := msgType.Validate(); err != nil {
187 return WriteResponseStats{}, err
188 }
189
190 // Encode the payload.
191 switch m := msg.(type) {
192 case vtProtoEnabled:
193 // Use optimized vtprotobuf if supported.
194 size := m.SizeVT()
195 if cap(*buf) < size {
196 *buf = make([]byte, size)
197 } else {
198 *buf = (*buf)[:size]
199 }
200
201 if _, err := m.MarshalToSizedBufferVT(*buf); err != nil {
202 return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
203 }
204 case gogoProtoEnabled:
205 // Gogo proto if supported.
206 size := m.Size()
207 if cap(*buf) < size {
208 *buf = make([]byte, size)
209 } else {
210 *buf = (*buf)[:size]
211 }
212
213 if _, err := m.MarshalToSizedBuffer(*buf); err != nil {
214 return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
215 }
216 case proto.Message:
217 // Generic proto.
218 *buf, err = (proto.MarshalOptions{}).MarshalAppend(*buf, m)
219 if err != nil {
220 return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
221 }
222 default:
223 return WriteResponseStats{}, fmt.Errorf("unknown message type %T", m)
224 }
225
226 comprBuf := r.bufPool.Get().(*[]byte)
227 payload, err := compressPayload(comprBuf, r.opts.compression, *buf)
228 if err != nil {
229 return WriteResponseStats{}, fmt.Errorf("compressing %w", err)
230 }
231 r.bufPool.Put(buf)
232 r.bufPool.Put(comprBuf)
233
234 // Since we retry writes we need to track the total amount of accepted data
235 // across the various attempts.
236 accumulatedStats := WriteResponseStats{}
237
238 b := backoff.New(ctx, r.opts.backoff)
239 for {
240 rs, err := r.attemptWrite(ctx, r.opts.compression, msgType, payload, b.NumRetries())

Callers 1

Calls 15

attemptWriteMethod · 0.95
AddMethod · 0.95
NoDataWrittenMethod · 0.95
RetryAfterMethod · 0.95
NewFunction · 0.92
compressPayloadFunction · 0.85
ValidateMethod · 0.80
SizeMethod · 0.80
MarshalToSizedBufferMethod · 0.80
NumRetriesMethod · 0.80
OngoingMethod · 0.80
NextDelayMethod · 0.80

Tested by 1