AsyncProduce sends a produce request and eventually call the provided callback with a produce response or an error. Waiting for the response is generally not blocking on the contrary to using Produce. If the maximum number of in flight request configured is reached then the request will be blocked
(request *ProduceRequest, cb ProduceCallback)
| 521 | // |
| 522 | // Make sure not to Close the broker in the callback as it will lead to a deadlock. |
| 523 | func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error { |
| 524 | b.lock.Lock() |
| 525 | defer b.lock.Unlock() |
| 526 | |
| 527 | needAcks := request.RequiredAcks != NoResponse |
| 528 | // Use a nil promise when no acks is required |
| 529 | var promise *responsePromise |
| 530 | |
| 531 | if needAcks { |
| 532 | metricRegistry := b.metricRegistry |
| 533 | |
| 534 | // Create ProduceResponse early to provide the header version |
| 535 | res := new(ProduceResponse) |
| 536 | promise = &responsePromise{ |
| 537 | response: res, |
| 538 | // Packets will be converted to a ProduceResponse in the responseReceiver goroutine |
| 539 | handler: func(packets []byte, err error) { |
| 540 | if err != nil { |
| 541 | // Failed request |
| 542 | cb(nil, err) |
| 543 | return |
| 544 | } |
| 545 | |
| 546 | if err := versionedDecode(packets, res, request.version(), metricRegistry); err != nil { |
| 547 | // Malformed response |
| 548 | cb(nil, err) |
| 549 | return |
| 550 | } |
| 551 | |
| 552 | // Well-formed response |
| 553 | b.handleThrottledResponse(res) |
| 554 | cb(res, nil) |
| 555 | }, |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | return b.sendWithPromise(request, promise) |
| 560 | } |
| 561 | |
| 562 | // Produce returns a produce response or error |
| 563 | func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { |