MCPcopy
hub / github.com/grafana/tempo / ProduceSync

Method ProduceSync

pkg/ingest/writer_client.go:270–337  ·  view source on GitHub ↗

ProduceSync produces records to Kafka and returns once all records have been successfully committed, or an error occurred. This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered, if the configured limit is reached.

(ctx context.Context, records []*kgo.Record)

Source from the content-addressed store, hash-verified

268// This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered,
269// if the configured limit is reached.
270func (c *Producer) ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults {
271 var (
272 remaining = atomic.NewInt64(int64(len(records)))
273 done = make(chan struct{})
274 resMx sync.Mutex
275 res = make(kgo.ProduceResults, 0, len(records))
276 )
277
278 // As a safety mechanism, we want to make sure that the context is not already canceled or its deadline exceeded.
279 // The reason is that once we buffer records to the Kafka client (later in this function), these records will be
280 // sent to the Kafka backend regardless the context is canceled or not. There's no way, in the Kafka client, to
281 // pull out records from a batch buffer. So, if the context is canceled, we circuit break instead of buffering
282 // records that may be sent to the Kafka backend, but that the caller will not know about because context is done.
283 if err := ctx.Err(); err != nil {
284 recordsCount := float64(len(records))
285 c.produceRecordsFailuresTotal.WithLabelValues("cancelled-before-producing").Add(recordsCount)
286 // We wrap the error to make it cristal clear where the context canceled/timeout comes from.
287 return kgo.ProduceResults{{Err: fmt.Errorf("skipped producing Kafka records because context is already done: %w", err)}}
288 }
289
290 onProduceDone := func(r *kgo.Record, err error) {
291 if c.maxBufferedBytes > 0 {
292 c.bufferedBytes.Add(-int64(len(r.Value)))
293 }
294
295 resMx.Lock()
296 res = append(res, kgo.ProduceResult{Record: r, Err: err})
297 resMx.Unlock()
298
299 if err != nil {
300 c.produceRecordsFailuresTotal.WithLabelValues(produceErrReason(err)).Inc()
301 }
302
303 // In case of error we'll wait for all responses anyway before returning from produceSync().
304 // It allows us to keep code easier, given we don't expect this function to be frequently
305 // called with multiple records.
306 if remaining.Dec() == 0 {
307 close(done)
308 }
309 }
310
311 for _, record := range records {
312 // Fast fail if the Kafka client buffer is full. Buffered bytes counter is decreased onProducerDone().
313 if c.maxBufferedBytes > 0 && c.bufferedBytes.Add(int64(len(record.Value))) > c.maxBufferedBytes {
314 onProduceDone(record, kgo.ErrMaxBuffered)
315 continue
316 }
317
318 // We use a new context to avoid that other Produce() may be cancelled when this call's context is
319 // canceled. It's important to note that cancelling the context passed to Produce() doesn't actually
320 // prevent the data to be sent over the wire (because it's never removed from the buffer) but in some
321 // cases may cause all requests to fail with context cancelled.
322 //
323 // Produce() may theoretically block if the buffer is full, but we configure the Kafka client with
324 // unlimited buffer because we implement the buffer limit ourselves (see maxBufferedBytes). This means
325 // Produce() should never block for us in practice.
326 c.Produce(context.WithoutCancel(ctx), record, onProduceDone)
327 }

Calls 4

produceErrReasonFunction · 0.85
AddMethod · 0.65
IncMethod · 0.65
DoneMethod · 0.65