MCPcopy
hub / github.com/IBM/sarama / newClaimWithRetry

Method newClaimWithRetry

consumer_group.go:1011–1039  ·  view source on GitHub ↗

newClaimWithRetry calls newConsumerGroupClaim, retrying transient errors so that brief leader/metadata desync around a rebalance doesn't leave a partition permanently unclaimed for the lifetime of this session

(topic string, partition int32, offset int64)

Source from the content-addressed store, hash-verified

1009// that brief leader/metadata desync around a rebalance doesn't leave a
1010// partition permanently unclaimed for the lifetime of this session
1011func (s *consumerGroupSession) newClaimWithRetry(topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
1012 retries := s.parent.config.Metadata.Retry.Max
1013 for {
1014 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
1015 if err == nil {
1016 return claim, nil
1017 }
1018 if retries <= 0 || !isRetriableClaimError(err) {
1019 return nil, err
1020 }
1021 retries--
1022
1023 backoff := computeMetadataBackoff(s.parent.config, retries)
1024 Logger.Printf(
1025 "consumer-group/claim %s/%d retrying after %dms... (%d attempts remaining): %v\n",
1026 topic, partition, backoff/time.Millisecond, retries, err)
1027
1028 select {
1029 case <-s.ctx.Done():
1030 return nil, err
1031 case <-s.parent.closed:
1032 return nil, err
1033 case <-time.After(backoff):
1034 }
1035
1036 // refresh leader/broker info before retrying
1037 _ = s.parent.client.RefreshMetadata(topic)
1038 }
1039}
1040
1041// isRetriableClaimError reports whether err from newConsumerGroupClaim is
1042// a transient condition worth retrying after a metadata refresh

Callers 1

consumeMethod · 0.95

Calls 6

newConsumerGroupClaimFunction · 0.85
isRetriableClaimErrorFunction · 0.85
computeMetadataBackoffFunction · 0.85
PrintfMethod · 0.65
DoneMethod · 0.65
RefreshMetadataMethod · 0.65

Tested by

no test coverage detected