one per broker; also constructs an associated flusher
(broker *Broker)
| 978 | |
| 979 | // one per broker; also constructs an associated flusher |
| 980 | func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { |
| 981 | var ( |
| 982 | input = make(chan *ProducerMessage) |
| 983 | bridge = make(chan *produceSet) |
| 984 | pending = make(chan *brokerProducerResponse) |
| 985 | responses = make(chan *brokerProducerResponse) |
| 986 | ) |
| 987 | |
| 988 | bp := &brokerProducer{ |
| 989 | parent: p, |
| 990 | broker: broker, |
| 991 | input: input, |
| 992 | output: bridge, |
| 993 | responses: responses, |
| 994 | accumulatingBatch: newProduceSet(p), |
| 995 | currentRetries: make(map[string]map[int32]error), |
| 996 | } |
| 997 | go withRecover(bp.run) |
| 998 | |
| 999 | // minimal bridge to make the network response `select`able |
| 1000 | go withRecover(func() { |
| 1001 | // Use a wait group to know if we still have in flight requests |
| 1002 | var wg sync.WaitGroup |
| 1003 | |
| 1004 | for set := range bridge { |
| 1005 | request := set.buildRequest() |
| 1006 | |
| 1007 | // Count the in flight requests to know when we can close the pending channel safely |
| 1008 | wg.Add(1) |
| 1009 | // capture the muted set. unmuting is deferred to handleResponse to ensure that |
| 1010 | // retries block subsequent batches for the same partition. |
| 1011 | mutedSet := set |
| 1012 | sendResponse := func(response *ProduceResponse, err error) { |
| 1013 | pending <- &brokerProducerResponse{ |
| 1014 | set: mutedSet, |
| 1015 | err: err, |
| 1016 | res: response, |
| 1017 | } |
| 1018 | wg.Done() |
| 1019 | } |
| 1020 | |
| 1021 | if p.IsTransactional() { |
| 1022 | // Add partition to tx before sending current batch |
| 1023 | err := p.txnmgr.publishTxnPartitions() |
| 1024 | if err != nil { |
| 1025 | // Request failed to be sent |
| 1026 | sendResponse(nil, err) |
| 1027 | continue |
| 1028 | } |
| 1029 | } |
| 1030 | |
| 1031 | // Use AsyncProduce vs Produce to not block waiting for the response |
| 1032 | // so that we can pipeline multiple produce requests and achieve higher throughput, see: |
| 1033 | // https://kafka.apache.org/protocol#protocol_network |
| 1034 | err := broker.AsyncProduce(request, sendResponse) |
| 1035 | if err != nil { |
| 1036 | // Request failed to be sent |
| 1037 | sendResponse(nil, err) |
no test coverage detected