ProduceSync produces records to Kafka and returns once all records have been successfully committed, or an error occurred. This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered, if the configured limit is reached.
(ctx context.Context, records []*kgo.Record)
| 268 | // This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered, |
| 269 | // if the configured limit is reached. |
| 270 | func (c *Producer) ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults { |
| 271 | var ( |
| 272 | remaining = atomic.NewInt64(int64(len(records))) |
| 273 | done = make(chan struct{}) |
| 274 | resMx sync.Mutex |
| 275 | res = make(kgo.ProduceResults, 0, len(records)) |
| 276 | ) |
| 277 | |
| 278 | // As a safety mechanism, we want to make sure that the context is not already canceled or its deadline exceeded. |
| 279 | // The reason is that once we buffer records to the Kafka client (later in this function), these records will be |
| 280 | // sent to the Kafka backend regardless the context is canceled or not. There's no way, in the Kafka client, to |
| 281 | // pull out records from a batch buffer. So, if the context is canceled, we circuit break instead of buffering |
| 282 | // records that may be sent to the Kafka backend, but that the caller will not know about because context is done. |
| 283 | if err := ctx.Err(); err != nil { |
| 284 | recordsCount := float64(len(records)) |
| 285 | c.produceRecordsFailuresTotal.WithLabelValues("cancelled-before-producing").Add(recordsCount) |
| 286 | // We wrap the error to make it cristal clear where the context canceled/timeout comes from. |
| 287 | return kgo.ProduceResults{{Err: fmt.Errorf("skipped producing Kafka records because context is already done: %w", err)}} |
| 288 | } |
| 289 | |
| 290 | onProduceDone := func(r *kgo.Record, err error) { |
| 291 | if c.maxBufferedBytes > 0 { |
| 292 | c.bufferedBytes.Add(-int64(len(r.Value))) |
| 293 | } |
| 294 | |
| 295 | resMx.Lock() |
| 296 | res = append(res, kgo.ProduceResult{Record: r, Err: err}) |
| 297 | resMx.Unlock() |
| 298 | |
| 299 | if err != nil { |
| 300 | c.produceRecordsFailuresTotal.WithLabelValues(produceErrReason(err)).Inc() |
| 301 | } |
| 302 | |
| 303 | // In case of error we'll wait for all responses anyway before returning from produceSync(). |
| 304 | // It allows us to keep code easier, given we don't expect this function to be frequently |
| 305 | // called with multiple records. |
| 306 | if remaining.Dec() == 0 { |
| 307 | close(done) |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | for _, record := range records { |
| 312 | // Fast fail if the Kafka client buffer is full. Buffered bytes counter is decreased onProducerDone(). |
| 313 | if c.maxBufferedBytes > 0 && c.bufferedBytes.Add(int64(len(record.Value))) > c.maxBufferedBytes { |
| 314 | onProduceDone(record, kgo.ErrMaxBuffered) |
| 315 | continue |
| 316 | } |
| 317 | |
| 318 | // We use a new context to avoid that other Produce() may be cancelled when this call's context is |
| 319 | // canceled. It's important to note that cancelling the context passed to Produce() doesn't actually |
| 320 | // prevent the data to be sent over the wire (because it's never removed from the buffer) but in some |
| 321 | // cases may cause all requests to fail with context cancelled. |
| 322 | // |
| 323 | // Produce() may theoretically block if the buffer is full, but we configure the Kafka client with |
| 324 | // unlimited buffer because we implement the buffer limit ourselves (see maxBufferedBytes). This means |
| 325 | // Produce() should never block for us in practice. |
| 326 | c.Produce(context.WithoutCancel(ctx), record, onProduceDone) |
| 327 | } |