MCPcopy
hub / github.com/IBM/sarama / publishOffsetsToTxn

Method publishOffsetsToTxn

transaction_manager.go:303–491  ·  view source on GitHub ↗

send txnmgnr save offsets to transaction coordinator.

(offsets topicPartitionOffsets, groupMetadata *ConsumerGroupMetadata)

Source from the content-addressed store, hash-verified

301
302// send txnmgnr save offsets to transaction coordinator.
303func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets, groupMetadata *ConsumerGroupMetadata) (topicPartitionOffsets, error) {
304 groupId := groupMetadata.GroupID
305 // First AddOffsetsToTxn
306 attemptsRemaining := t.client.Config().Producer.Transaction.Retry.Max
307 exec := func(run func() (bool, error), err error) error {
308 for attemptsRemaining >= 0 {
309 var retry bool
310 retry, err = run()
311 if !retry {
312 return err
313 }
314 backoff := t.computeBackoff(attemptsRemaining)
315 Logger.Printf("txnmgr/add-offset-to-txn [%s] retrying after %dms... (%d attempts remaining) (%s)\n",
316 t.transactionalID, backoff/time.Millisecond, attemptsRemaining, err)
317 time.Sleep(backoff)
318 attemptsRemaining--
319 }
320 return err
321 }
322 lastError := exec(func() (bool, error) {
323 coordinator, err := t.client.TransactionCoordinator(t.transactionalID)
324 if err != nil {
325 return true, err
326 }
327 request := &AddOffsetsToTxnRequest{
328 TransactionalID: t.transactionalID,
329 ProducerEpoch: t.producerEpoch,
330 ProducerID: t.producerID,
331 GroupID: groupId,
332 }
333 if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
334 // Version 2 adds the support for new error code PRODUCER_FENCED.
335 request.Version = 2
336 } else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
337 // Version 1 is the same as version 0.
338 request.Version = 1
339 }
340 response, err := coordinator.AddOffsetsToTxn(request)
341 if err != nil {
342 // If an error occurred try to refresh current transaction coordinator.
343 _ = coordinator.Close()
344 _ = t.client.RefreshTransactionCoordinator(t.transactionalID)
345 return true, err
346 }
347 if response == nil {
348 // If no response is returned just retry.
349 return true, ErrTxnUnableToParseResponse
350 }
351 if response.Err == ErrNoError {
352 DebugLogger.Printf("txnmgr/add-offset-to-txn [%s] successful add-offset-to-txn with group %s %+v\n",
353 t.transactionalID, groupId, response)
354 // If no error, just exit.
355 return false, nil
356 }
357 switch response.Err {
358 case ErrConsumerCoordinatorNotAvailable:
359 fallthrough
360 case ErrNotCoordinatorForConsumer:

Callers 4

finishTransactionMethod · 0.95
TestAddOffsetsToTxnFunction · 0.80
TestTxnOffsetsCommitFunction · 0.80

Calls 15

computeBackoffMethod · 0.95
transitionToMethod · 0.95
WrapFunction · 0.85
IsAtLeastMethod · 0.80
TxnOffsetCommitMethod · 0.80
ConfigMethod · 0.65
PrintfMethod · 0.65
AddOffsetsToTxnMethod · 0.65
CloseMethod · 0.65

Tested by 3

TestAddOffsetsToTxnFunction · 0.64
TestTxnOffsetsCommitFunction · 0.64