MCPcopy
hub / github.com/IBM/sarama / publishTxnPartitions

Method publishTxnPartitions

transaction_manager.go:779–915  ·  view source on GitHub ↗

Makes a request to kafka to add a list of partitions ot the current transaction.

()

Source from the content-addressed store, hash-verified

777
778// Makes a request to kafka to add a list of partitions ot the current transaction.
779func (t *transactionManager) publishTxnPartitions() error {
780 t.partitionInTxnLock.Lock()
781 defer t.partitionInTxnLock.Unlock()
782
783 if t.currentTxnStatus()&ProducerTxnFlagInError != 0 {
784 return t.lastError
785 }
786
787 if len(t.pendingPartitionsInCurrentTxn) == 0 {
788 return nil
789 }
790
791 // Remove the partitions from the pending set regardless of the result. We use the presence
792 // of partitions in the pending set to know when it is not safe to send batches. However, if
793 // the partitions failed to be added and we enter an error state, we expect the batches to be
794 // aborted anyway. In this case, we must be able to continue sending the batches which are in
795 // retry for partitions that were successfully added.
796 removeAllPartitionsOnFatalOrAbortedError := func() {
797 t.pendingPartitionsInCurrentTxn = topicPartitionSet{}
798 }
799
800 // We only want to reduce the backoff when retrying the first AddPartition which errored out due to a
801 // CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and
802 // we don't want to wait too long before trying to start the new one.
803 //
804 // This is only a temporary fix, the long term solution is being tracked in
805 // https://issues.apache.org/jira/browse/KAFKA-5482
806 retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff
807 computeBackoff := func(attemptsRemaining int) time.Duration {
808 if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil {
809 maxRetries := t.client.Config().Producer.Transaction.Retry.Max
810 retries := maxRetries - attemptsRemaining
811 return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries)
812 }
813 return retryBackoff
814 }
815 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
816
817 exec := func(run func() (bool, error), err error) error {
818 for attemptsRemaining >= 0 {
819 var retry bool
820 retry, err = run()
821 if !retry {
822 return err
823 }
824 backoff := computeBackoff(attemptsRemaining)
825 Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err)
826 time.Sleep(backoff)
827 attemptsRemaining--
828 }
829 return err
830 }
831 return exec(func() (bool, error) {
832 coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
833 if err != nil {
834 return true, err
835 }
836 request := &AddPartitionsToTxnRequest{

Callers 2

newBrokerProducerMethod · 0.80

Calls 12

currentTxnStatusMethod · 0.95
transitionToMethod · 0.95
WrapFunction · 0.85
IsAtLeastMethod · 0.80
AddPartitionsToTxnMethod · 0.80
ConfigMethod · 0.65
PrintfMethod · 0.65
CloseMethod · 0.65
mapToRequestMethod · 0.45

Tested by 1