()
| 491 | } |
| 492 | |
| 493 | func (t *transactionManager) initProducerId() (int64, int16, error) { |
| 494 | isEpochBump := false |
| 495 | |
| 496 | req := &InitProducerIDRequest{} |
| 497 | if t.isTransactional() { |
| 498 | req.TransactionalID = &t.transactionalID |
| 499 | req.TransactionTimeout = t.transactionTimeout |
| 500 | } |
| 501 | |
| 502 | if t.client.Config().Version.IsAtLeast(V2_5_0_0) { |
| 503 | if t.client.Config().Version.IsAtLeast(V2_7_0_0) { |
| 504 | // Version 4 adds the support for new error code PRODUCER_FENCED. |
| 505 | req.Version = 4 |
| 506 | } else { |
| 507 | // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try |
| 508 | // to resume after an INVALID_PRODUCER_EPOCH error |
| 509 | req.Version = 3 |
| 510 | } |
| 511 | isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch |
| 512 | t.coordinatorSupportsBumpingEpoch = true |
| 513 | req.ProducerID = t.producerID |
| 514 | req.ProducerEpoch = t.producerEpoch |
| 515 | } else if t.client.Config().Version.IsAtLeast(V2_4_0_0) { |
| 516 | // Version 2 is the first flexible version. |
| 517 | req.Version = 2 |
| 518 | } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| 519 | // Version 1 is the same as version 0. |
| 520 | req.Version = 1 |
| 521 | } |
| 522 | |
| 523 | if isEpochBump { |
| 524 | err := t.transitionTo(ProducerTxnFlagInitializing, nil) |
| 525 | if err != nil { |
| 526 | return -1, -1, err |
| 527 | } |
| 528 | DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId for the first time in order to acquire a producer ID\n", |
| 529 | t.transactionalID) |
| 530 | } else { |
| 531 | DebugLogger.Printf("txnmgr/init-producer-id [%s] invoking InitProducerId with current producer ID %d and epoch %d in order to bump the epoch\n", |
| 532 | t.transactionalID, t.producerID, t.producerEpoch) |
| 533 | } |
| 534 | |
| 535 | attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| 536 | exec := func(run func() (int64, int16, bool, error), err error) (int64, int16, error) { |
| 537 | pid := int64(-1) |
| 538 | pepoch := int16(-1) |
| 539 | for attemptsRemaining >= 0 { |
| 540 | var retry bool |
| 541 | pid, pepoch, retry, err = run() |
| 542 | if !retry { |
| 543 | return pid, pepoch, err |
| 544 | } |
| 545 | backoff := t.computeBackoff(attemptsRemaining) |
| 546 | Logger.Printf("txnmgr/init-producer-id [%s] retrying after %dms... (%d attempts remaining) (%s)\n", |
| 547 | t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err) |
| 548 | time.Sleep(backoff) |
| 549 | attemptsRemaining-- |
| 550 | } |
no test coverage detected