Makes a request to kafka to add a list of partitions ot the current transaction.
()
| 777 | |
| 778 | // Makes a request to kafka to add a list of partitions ot the current transaction. |
| 779 | func (t *transactionManager) publishTxnPartitions() error { |
| 780 | t.partitionInTxnLock.Lock() |
| 781 | defer t.partitionInTxnLock.Unlock() |
| 782 | |
| 783 | if t.currentTxnStatus()&ProducerTxnFlagInError != 0 { |
| 784 | return t.lastError |
| 785 | } |
| 786 | |
| 787 | if len(t.pendingPartitionsInCurrentTxn) == 0 { |
| 788 | return nil |
| 789 | } |
| 790 | |
| 791 | // Remove the partitions from the pending set regardless of the result. We use the presence |
| 792 | // of partitions in the pending set to know when it is not safe to send batches. However, if |
| 793 | // the partitions failed to be added and we enter an error state, we expect the batches to be |
| 794 | // aborted anyway. In this case, we must be able to continue sending the batches which are in |
| 795 | // retry for partitions that were successfully added. |
| 796 | removeAllPartitionsOnFatalOrAbortedError := func() { |
| 797 | t.pendingPartitionsInCurrentTxn = topicPartitionSet{} |
| 798 | } |
| 799 | |
| 800 | // We only want to reduce the backoff when retrying the first AddPartition which errored out due to a |
| 801 | // CONCURRENT_TRANSACTIONS error since this means that the previous transaction is still completing and |
| 802 | // we don't want to wait too long before trying to start the new one. |
| 803 | // |
| 804 | // This is only a temporary fix, the long term solution is being tracked in |
| 805 | // https://issues.apache.org/jira/browse/KAFKA-5482 |
| 806 | retryBackoff := t.client.Config().Producer.Transaction.Retry.Backoff |
| 807 | computeBackoff := func(attemptsRemaining int) time.Duration { |
| 808 | if t.client.Config().Producer.Transaction.Retry.BackoffFunc != nil { |
| 809 | maxRetries := t.client.Config().Producer.Transaction.Retry.Max |
| 810 | retries := maxRetries - attemptsRemaining |
| 811 | return t.client.Config().Producer.Transaction.Retry.BackoffFunc(retries, maxRetries) |
| 812 | } |
| 813 | return retryBackoff |
| 814 | } |
| 815 | attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| 816 | |
| 817 | exec := func(run func() (bool, error), err error) error { |
| 818 | for attemptsRemaining >= 0 { |
| 819 | var retry bool |
| 820 | retry, err = run() |
| 821 | if !retry { |
| 822 | return err |
| 823 | } |
| 824 | backoff := computeBackoff(attemptsRemaining) |
| 825 | Logger.Printf("txnmgr/add-partition-to-txn retrying after %dms... (%d attempts remaining) (%s)\n", backoff/time.Millisecond, attemptsRemaining, err) |
| 826 | time.Sleep(backoff) |
| 827 | attemptsRemaining-- |
| 828 | } |
| 829 | return err |
| 830 | } |
| 831 | return exec(func() (bool, error) { |
| 832 | coordinator, err := t.client.TransactionCoordinator(t.transactionalID) |
| 833 | if err != nil { |
| 834 | return true, err |
| 835 | } |
| 836 | request := &AddPartitionsToTxnRequest{ |