singleton dispatches messages by topic
()
| 585 | // singleton |
| 586 | // dispatches messages by topic |
| 587 | func (p *asyncProducer) dispatcher() { |
| 588 | handlers := make(map[string]chan<- *ProducerMessage) |
| 589 | shuttingDown := false |
| 590 | |
| 591 | for msg := range p.input { |
| 592 | if msg == nil { |
| 593 | Logger.Println("Something tried to send a nil message, it was ignored.") |
| 594 | continue |
| 595 | } |
| 596 | |
| 597 | if msg.flags&endtxn != 0 { |
| 598 | var err error |
| 599 | if msg.flags&committxn != 0 { |
| 600 | err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil) |
| 601 | } else { |
| 602 | err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil) |
| 603 | } |
| 604 | if err != nil { |
| 605 | Logger.Printf("producer/txnmgr unable to end transaction %s", err) |
| 606 | } |
| 607 | p.inFlight.Done() |
| 608 | continue |
| 609 | } |
| 610 | |
| 611 | if msg.flags&shutdown != 0 { |
| 612 | shuttingDown = true |
| 613 | p.inFlight.Done() |
| 614 | continue |
| 615 | } |
| 616 | |
| 617 | if msg.retries == 0 { |
| 618 | if shuttingDown { |
| 619 | // we can't just call returnError here because that decrements the wait group, |
| 620 | // which hasn't been incremented yet for this message, and shouldn't be |
| 621 | pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown} |
| 622 | if p.conf.Producer.Return.Errors { |
| 623 | p.errors <- pErr |
| 624 | } else { |
| 625 | Logger.Println(pErr) |
| 626 | } |
| 627 | continue |
| 628 | } |
| 629 | p.inFlight.Add(1) |
| 630 | // Ignore retried msg, there are already in txn. |
| 631 | // Can't produce new record when transaction is not started. |
| 632 | if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 { |
| 633 | Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction) |
| 634 | p.returnError(msg, ErrTransactionNotReady) |
| 635 | continue |
| 636 | } |
| 637 | } |
| 638 | |
| 639 | for _, interceptor := range p.conf.Producer.Interceptors { |
| 640 | msg.safelyApplyInterceptor(interceptor) |
| 641 | } |
| 642 | |
| 643 | version := 1 |
| 644 | if p.conf.Version.IsAtLeast(V0_11_0_0) { |
nothing calls this directly
no test coverage detected