send EndTxn request with commit flag. (true when committing false otherwise)
(commit bool)
| 634 | |
| 635 | // send EndTxn request with commit flag. (true when committing false otherwise) |
| 636 | func (t *transactionManager) endTxn(commit bool) error { |
| 637 | attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| 638 | exec := func(run func() (bool, error), err error) error { |
| 639 | for attemptsRemaining >= 0 { |
| 640 | var retry bool |
| 641 | retry, err = run() |
| 642 | if !retry { |
| 643 | return err |
| 644 | } |
| 645 | backoff := t.computeBackoff(attemptsRemaining) |
| 646 | Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n", |
| 647 | t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err) |
| 648 | time.Sleep(backoff) |
| 649 | attemptsRemaining-- |
| 650 | } |
| 651 | return err |
| 652 | } |
| 653 | return exec(func() (bool, error) { |
| 654 | coordinator, err := t.client.TransactionCoordinator(t.transactionalID) |
| 655 | if err != nil { |
| 656 | return true, err |
| 657 | } |
| 658 | request := &EndTxnRequest{ |
| 659 | TransactionalID: t.transactionalID, |
| 660 | ProducerEpoch: t.producerEpoch, |
| 661 | ProducerID: t.producerID, |
| 662 | TransactionResult: commit, |
| 663 | } |
| 664 | if t.client.Config().Version.IsAtLeast(V2_7_0_0) { |
| 665 | // Version 2 adds the support for new error code PRODUCER_FENCED. |
| 666 | request.Version = 2 |
| 667 | } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| 668 | // Version 1 is the same as version 0. |
| 669 | request.Version = 1 |
| 670 | } |
| 671 | response, err := coordinator.EndTxn(request) |
| 672 | if err != nil { |
| 673 | // Always retry on network error |
| 674 | _ = coordinator.Close() |
| 675 | _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| 676 | return true, err |
| 677 | } |
| 678 | if response == nil { |
| 679 | return true, ErrTxnUnableToParseResponse |
| 680 | } |
| 681 | if response.Err == ErrNoError { |
| 682 | DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n", |
| 683 | t.transactionalID, response) |
| 684 | return false, t.completeTransaction() |
| 685 | } |
| 686 | switch response.Err { |
| 687 | // Need to refresh coordinator |
| 688 | case ErrConsumerCoordinatorNotAvailable: |
| 689 | fallthrough |
| 690 | case ErrNotCoordinatorForConsumer: |
| 691 | _ = coordinator.Close() |
| 692 | _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| 693 | fallthrough |