MCPcopy
hub / github.com/IBM/sarama / newBrokerProducer

Method newBrokerProducer

async_producer.go:980–1089  ·  view source on GitHub ↗

one per broker; also constructs an associated flusher

(broker *Broker)

Source from the content-addressed store, hash-verified

978
979// one per broker; also constructs an associated flusher
980func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
981 var (
982 input = make(chan *ProducerMessage)
983 bridge = make(chan *produceSet)
984 pending = make(chan *brokerProducerResponse)
985 responses = make(chan *brokerProducerResponse)
986 )
987
988 bp := &brokerProducer{
989 parent: p,
990 broker: broker,
991 input: input,
992 output: bridge,
993 responses: responses,
994 accumulatingBatch: newProduceSet(p),
995 currentRetries: make(map[string]map[int32]error),
996 }
997 go withRecover(bp.run)
998
999 // minimal bridge to make the network response `select`able
1000 go withRecover(func() {
1001 // Use a wait group to know if we still have in flight requests
1002 var wg sync.WaitGroup
1003
1004 for set := range bridge {
1005 request := set.buildRequest()
1006
1007 // Count the in flight requests to know when we can close the pending channel safely
1008 wg.Add(1)
1009 // capture the muted set. unmuting is deferred to handleResponse to ensure that
1010 // retries block subsequent batches for the same partition.
1011 mutedSet := set
1012 sendResponse := func(response *ProduceResponse, err error) {
1013 pending <- &brokerProducerResponse{
1014 set: mutedSet,
1015 err: err,
1016 res: response,
1017 }
1018 wg.Done()
1019 }
1020
1021 if p.IsTransactional() {
1022 // Add partition to tx before sending current batch
1023 err := p.txnmgr.publishTxnPartitions()
1024 if err != nil {
1025 // Request failed to be sent
1026 sendResponse(nil, err)
1027 continue
1028 }
1029 }
1030
1031 // Use AsyncProduce vs Produce to not block waiting for the response
1032 // so that we can pipeline multiple produce requests and achieve higher throughput, see:
1033 // https://kafka.apache.org/protocol#protocol_network
1034 err := broker.AsyncProduce(request, sendResponse)
1035 if err != nil {
1036 // Request failed to be sent
1037 sendResponse(nil, err)

Callers 1

getBrokerProducerMethod · 0.95

Calls 11

IsTransactionalMethod · 0.95
newProduceSetFunction · 0.85
withRecoverFunction · 0.85
buildRequestMethod · 0.80
publishTxnPartitionsMethod · 0.80
AsyncProduceMethod · 0.80
PeekMethod · 0.80
RemoveMethod · 0.80
DoneMethod · 0.65
LengthMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected