newClaimWithRetry calls newConsumerGroupClaim, retrying transient errors so that brief leader/metadata desync around a rebalance doesn't leave a partition permanently unclaimed for the lifetime of this session
(topic string, partition int32, offset int64)
| 1009 | // that brief leader/metadata desync around a rebalance doesn't leave a |
| 1010 | // partition permanently unclaimed for the lifetime of this session |
| 1011 | func (s *consumerGroupSession) newClaimWithRetry(topic string, partition int32, offset int64) (*consumerGroupClaim, error) { |
| 1012 | retries := s.parent.config.Metadata.Retry.Max |
| 1013 | for { |
| 1014 | claim, err := newConsumerGroupClaim(s, topic, partition, offset) |
| 1015 | if err == nil { |
| 1016 | return claim, nil |
| 1017 | } |
| 1018 | if retries <= 0 || !isRetriableClaimError(err) { |
| 1019 | return nil, err |
| 1020 | } |
| 1021 | retries-- |
| 1022 | |
| 1023 | backoff := computeMetadataBackoff(s.parent.config, retries) |
| 1024 | Logger.Printf( |
| 1025 | "consumer-group/claim %s/%d retrying after %dms... (%d attempts remaining): %v\n", |
| 1026 | topic, partition, backoff/time.Millisecond, retries, err) |
| 1027 | |
| 1028 | select { |
| 1029 | case <-s.ctx.Done(): |
| 1030 | return nil, err |
| 1031 | case <-s.parent.closed: |
| 1032 | return nil, err |
| 1033 | case <-time.After(backoff): |
| 1034 | } |
| 1035 | |
| 1036 | // refresh leader/broker info before retrying |
| 1037 | _ = s.parent.client.RefreshMetadata(topic) |
| 1038 | } |
| 1039 | } |
| 1040 | |
| 1041 | // isRetriableClaimError reports whether err from newConsumerGroupClaim is |
| 1042 | // a transient condition worth retrying after a metadata refresh |
no test coverage detected