send txnmgnr save offsets to transaction coordinator.
(offsets topicPartitionOffsets, groupMetadata *ConsumerGroupMetadata)
| 301 | |
| 302 | // send txnmgnr save offsets to transaction coordinator. |
| 303 | func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupMetadata *ConsumerGroupMetadata) (topicPartitionOffsets, error) { |
| 304 | groupId := groupMetadata.GroupID |
| 305 | // First AddOffsetsToTxn |
| 306 | attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max |
| 307 | exec := func(run func() (bool, error), err error) error { |
| 308 | for attemptsRemaining >= 0 { |
| 309 | var retry bool |
| 310 | retry, err = run() |
| 311 | if !retry { |
| 312 | return err |
| 313 | } |
| 314 | backoff := t.computeBackoff(attemptsRemaining) |
| 315 | Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n", |
| 316 | t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err) |
| 317 | time.Sleep(backoff) |
| 318 | attemptsRemaining-- |
| 319 | } |
| 320 | return err |
| 321 | } |
| 322 | lastError := exec(func() (bool, error) { |
| 323 | coordinator, err := t.client.TransactionCoordinator(t.transactionalID) |
| 324 | if err != nil { |
| 325 | return true, err |
| 326 | } |
| 327 | request := &AddOffsetsToTxnRequest{ |
| 328 | TransactionalID: t.transactionalID, |
| 329 | ProducerEpoch: t.producerEpoch, |
| 330 | ProducerID: t.producerID, |
| 331 | GroupID: groupId, |
| 332 | } |
| 333 | if t.client.Config().Version.IsAtLeast(V2_7_0_0) { |
| 334 | // Version 2 adds the support for new error code PRODUCER_FENCED. |
| 335 | request.Version = 2 |
| 336 | } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) { |
| 337 | // Version 1 is the same as version 0. |
| 338 | request.Version = 1 |
| 339 | } |
| 340 | response, err := coordinator.AddOffsetsToTxn(request) |
| 341 | if err != nil { |
| 342 | // If an error occurred try to refresh current transaction coordinator. |
| 343 | _ = coordinator.Close() |
| 344 | _ = t.client.RefreshTransactionCoordinator(t.transactionalID) |
| 345 | return true, err |
| 346 | } |
| 347 | if response == nil { |
| 348 | // If no response is returned just retry. |
| 349 | return true, ErrTxnUnableToParseResponse |
| 350 | } |
| 351 | if response.Err == ErrNoError { |
| 352 | DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n", |
| 353 | t.transactionalID, groupId, response) |
| 354 | // If no error, just exit. |
| 355 | return false, nil |
| 356 | } |
| 357 | switch response.Err { |
| 358 | case ErrConsumerCoordinatorNotAvailable: |
| 359 | fallthrough |
| 360 | case ErrNotCoordinatorForConsumer: |