MCPcopy
hub / github.com/IBM/sarama / initProducerId

Method initProducerId

transaction_manager.go:493–600  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

491}
492
493func (t *transactionManager) initProducerId() (int64, int16, error) {
494 isEpochBump := false
495
496 req := &InitProducerIDRequest{}
497 if t.isTransactional() {
498 req.TransactionalID = &t.transactionalID
499 req.TransactionTimeout = t.transactionTimeout
500 }
501
502 if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
503 if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
504 // Version 4 adds the support for new error code PRODUCER_FENCED.
505 req.Version = 4
506 } else {
507 // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
508 // to resume after an INVALID_PRODUCER_EPOCH error
509 req.Version = 3
510 }
511 isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
512 t.coordinatorSupportsBumpingEpoch = true
513 req.ProducerID = t.producerID
514 req.ProducerEpoch = t.producerEpoch
515 } else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
516 // Version 2 is the first flexible version.
517 req.Version = 2
518 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
519 // Version 1 is the same as version 0.
520 req.Version = 1
521 }
522
523 if isEpochBump {
524 err := t.transitionTo(ProducerTxnFlagInitializing, nil)
525 if err != nil {
526 return -1, -1, err
527 }
528 DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n",
529 t.transactionalID)
530 } else {
531 DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n",
532 t.transactionalID, t.producerID, t.producerEpoch)
533 }
534
535 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
536 exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) {
537 pid := int64(-1)
538 pepoch := int16(-1)
539 for attemptsRemaining >= 0 {
540 var retry bool
541 pid, pepoch, retry, err = run()
542 if !retry {
543 return pid, pepoch, err
544 }
545 backoff := t.computeBackoff(attemptsRemaining)
546 Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
547 t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
548 time.Sleep(backoff)
549 attemptsRemaining--
550 }

Callers 2

newTransactionManagerFunction · 0.95

Calls 11

isTransactionalMethod · 0.95
transitionToMethod · 0.95
computeBackoffMethod · 0.95
InitProducerIDMethod · 0.95
CloseMethod · 0.95
IsAtLeastMethod · 0.80
ConfigMethod · 0.65
PrintfMethod · 0.65
LeastLoadedBrokerMethod · 0.65

Tested by

no test coverage detected