MCPcopy
hub / github.com/IBM/sarama / AsyncProduce

Method AsyncProduce

broker.go:523–560  ·  view source on GitHub ↗

AsyncProduce sends a produce request and eventually call the provided callback with a produce response or an error. Waiting for the response is generally not blocking on the contrary to using Produce. If the maximum number of in flight request configured is reached then the request will be blocked

(request *ProduceRequest, cb ProduceCallback)

Source from the content-addressed store, hash-verified

521//
522// Make sure not to Close the broker in the callback as it will lead to a deadlock.
523func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
524 b.lock.Lock()
525 defer b.lock.Unlock()
526
527 needAcks := request.RequiredAcks != NoResponse
528 // Use a nil promise when no acks is required
529 var promise *responsePromise
530
531 if needAcks {
532 metricRegistry := b.metricRegistry
533
534 // Create ProduceResponse early to provide the header version
535 res := new(ProduceResponse)
536 promise = &responsePromise{
537 response: res,
538 // Packets will be converted to a ProduceResponse in the responseReceiver goroutine
539 handler: func(packets []byte, err error) {
540 if err != nil {
541 // Failed request
542 cb(nil, err)
543 return
544 }
545
546 if err := versionedDecode(packets, res, request.version(), metricRegistry); err != nil {
547 // Malformed response
548 cb(nil, err)
549 return
550 }
551
552 // Well-formed response
553 b.handleThrottledResponse(res)
554 cb(res, nil)
555 },
556 }
557 }
558
559 return b.sendWithPromise(request, promise)
560}
561
562// Produce returns a produce response or error
563func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {

Callers 3

TestBrokerCloseFunction · 0.95
broker_test.goFile · 0.80
newBrokerProducerMethod · 0.80

Calls 4

sendWithPromiseMethod · 0.95
versionedDecodeFunction · 0.85
versionMethod · 0.65

Tested by 1

TestBrokerCloseFunction · 0.76