()
| 1125 | } |
| 1126 | |
| 1127 | func (s *consumerGroupSession) heartbeatLoop() { |
| 1128 | defer close(s.hbDead) |
| 1129 | defer s.cancel(ErrSessionHeartbeatFailed) // trigger the end of the session on exit |
| 1130 | defer func() { |
| 1131 | Logger.Printf( |
| 1132 | "consumergroup/session/%s/%d heartbeat loop stopped\n", |
| 1133 | s.MemberID(), s.GenerationID()) |
| 1134 | }() |
| 1135 | |
| 1136 | pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval) |
| 1137 | defer pause.Stop() |
| 1138 | |
| 1139 | retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff) |
| 1140 | defer retryBackoff.Stop() |
| 1141 | |
| 1142 | retries := s.parent.config.Metadata.Retry.Max |
| 1143 | for { |
| 1144 | coordinator, err := s.parent.client.Coordinator(s.parent.groupID) |
| 1145 | if err != nil { |
| 1146 | if retries <= 0 { |
| 1147 | s.parent.handleError(err, "", -1) |
| 1148 | s.cancel(err) |
| 1149 | return |
| 1150 | } |
| 1151 | retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff) |
| 1152 | select { |
| 1153 | case <-s.hbDying: |
| 1154 | return |
| 1155 | case <-retryBackoff.C: |
| 1156 | retries-- |
| 1157 | } |
| 1158 | continue |
| 1159 | } |
| 1160 | |
| 1161 | resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID) |
| 1162 | if err != nil { |
| 1163 | _ = coordinator.Close() |
| 1164 | |
| 1165 | if retries <= 0 { |
| 1166 | s.parent.handleError(err, "", -1) |
| 1167 | s.cancel(err) |
| 1168 | return |
| 1169 | } |
| 1170 | |
| 1171 | retries-- |
| 1172 | continue |
| 1173 | } |
| 1174 | |
| 1175 | switch err := resp.Err; err { |
| 1176 | case ErrNoError: |
| 1177 | retries = s.parent.config.Metadata.Retry.Max |
| 1178 | case ErrRebalanceInProgress: |
| 1179 | retries = s.parent.config.Metadata.Retry.Max |
| 1180 | s.cancel(err) |
| 1181 | case ErrUnknownMemberId, ErrIllegalGeneration: |
| 1182 | s.cancel(err) |
| 1183 | return |
| 1184 | case ErrFencedInstancedId: |
no test coverage detected