MCPcopy
hub / github.com/IBM/sarama / endTxn

Method endTxn

transaction_manager.go:636–708  ·  view source on GitHub ↗

send EndTxn request with commit flag. (true when committing false otherwise)

(commit bool)

Source from the content-addressed store, hash-verified

634
635// send EndTxn request with commit flag. (true when committing false otherwise)
636func (t *transactionManager) endTxn(commit bool) error {
637 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
638 exec := func(run func() (bool, error), err error) error {
639 for attemptsRemaining >= 0 {
640 var retry bool
641 retry, err = run()
642 if !retry {
643 return err
644 }
645 backoff := t.computeBackoff(attemptsRemaining)
646 Logger.Printf("txnmgr/endtxn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
647 t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
648 time.Sleep(backoff)
649 attemptsRemaining--
650 }
651 return err
652 }
653 return exec(func() (bool, error) {
654 coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
655 if err != nil {
656 return true, err
657 }
658 request := &EndTxnRequest{
659 TransactionalID: t.transactionalID,
660 ProducerEpoch: t.producerEpoch,
661 ProducerID: t.producerID,
662 TransactionResult: commit,
663 }
664 if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
665 // Version 2 adds the support for new error code PRODUCER_FENCED.
666 request.Version = 2
667 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
668 // Version 1 is the same as version 0.
669 request.Version = 1
670 }
671 response, err := coordinator.EndTxn(request)
672 if err != nil {
673 // Always retry on network error
674 _ = coordinator.Close()
675 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
676 return true, err
677 }
678 if response == nil {
679 return true, ErrTxnUnableToParseResponse
680 }
681 if response.Err == ErrNoError {
682 DebugLogger.Printf("txnmgr/endtxn [%s] successful to end txn %+v\n",
683 t.transactionalID, response)
684 return false, t.completeTransaction()
685 }
686 switch response.Err {
687 // Need to refresh coordinator
688 case ErrConsumerCoordinatorNotAvailable:
689 fallthrough
690 case ErrNotCoordinatorForConsumer:
691 _ = coordinator.Close()
692 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
693 fallthrough

Callers 2

finishTransactionMethod · 0.95
TestEndTxnFunction · 0.80

Calls 11

computeBackoffMethod · 0.95
completeTransactionMethod · 0.95
transitionToMethod · 0.95
IsAtLeastMethod · 0.80
EndTxnMethod · 0.80
ConfigMethod · 0.65
PrintfMethod · 0.65
CloseMethod · 0.65

Tested by 1

TestEndTxnFunction · 0.64