MCPcopy
hub / github.com/IBM/sarama / dispatcher

Method dispatcher

async_producer.go:587–669  ·  view source on GitHub ↗

singleton dispatches messages by topic

()

Source from the content-addressed store, hash-verified

585// singleton
586// dispatches messages by topic
587func (p *asyncProducer) dispatcher() {
588 handlers := make(map[string]chan<- *ProducerMessage)
589 shuttingDown := false
590
591 for msg := range p.input {
592 if msg == nil {
593 Logger.Println("Something tried to send a nil message, it was ignored.")
594 continue
595 }
596
597 if msg.flags&endtxn != 0 {
598 var err error
599 if msg.flags&committxn != 0 {
600 err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil)
601 } else {
602 err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil)
603 }
604 if err != nil {
605 Logger.Printf("producer/txnmgr unable to end transaction %s", err)
606 }
607 p.inFlight.Done()
608 continue
609 }
610
611 if msg.flags&shutdown != 0 {
612 shuttingDown = true
613 p.inFlight.Done()
614 continue
615 }
616
617 if msg.retries == 0 {
618 if shuttingDown {
619 // we can't just call returnError here because that decrements the wait group,
620 // which hasn't been incremented yet for this message, and shouldn't be
621 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
622 if p.conf.Producer.Return.Errors {
623 p.errors <- pErr
624 } else {
625 Logger.Println(pErr)
626 }
627 continue
628 }
629 p.inFlight.Add(1)
630 // Ignore retried msg, there are already in txn.
631 // Can't produce new record when transaction is not started.
632 if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
633 Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction)
634 p.returnError(msg, ErrTransactionNotReady)
635 continue
636 }
637 }
638
639 for _, interceptor := range p.conf.Producer.Interceptors {
640 msg.safelyApplyInterceptor(interceptor)
641 }
642
643 version := 1
644 if p.conf.Version.IsAtLeast(V0_11_0_0) {

Callers

nothing calls this directly

Calls 13

IsTransactionalMethod · 0.95
returnErrorMethod · 0.95
newTopicProducerMethod · 0.95
ConfigurationErrorTypeAlias · 0.85
transitionToMethod · 0.80
currentTxnStatusMethod · 0.80
IsAtLeastMethod · 0.80
ByteSizeMethod · 0.80
PrintlnMethod · 0.65
PrintfMethod · 0.65
DoneMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected