MCPcopy
hub / github.com/IBM/sarama / heartbeatLoop

Method heartbeatLoop

consumer_group.go:1127–1203  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

1125}
1126
1127func (s *consumerGroupSession) heartbeatLoop() {
1128 defer close(s.hbDead)
1129 defer s.cancel(ErrSessionHeartbeatFailed) // trigger the end of the session on exit
1130 defer func() {
1131 Logger.Printf(
1132 "consumergroup/session/%s/%d heartbeat loop stopped\n",
1133 s.MemberID(), s.GenerationID())
1134 }()
1135
1136 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
1137 defer pause.Stop()
1138
1139 retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
1140 defer retryBackoff.Stop()
1141
1142 retries := s.parent.config.Metadata.Retry.Max
1143 for {
1144 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
1145 if err != nil {
1146 if retries <= 0 {
1147 s.parent.handleError(err, "", -1)
1148 s.cancel(err)
1149 return
1150 }
1151 retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
1152 select {
1153 case <-s.hbDying:
1154 return
1155 case <-retryBackoff.C:
1156 retries--
1157 }
1158 continue
1159 }
1160
1161 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
1162 if err != nil {
1163 _ = coordinator.Close()
1164
1165 if retries <= 0 {
1166 s.parent.handleError(err, "", -1)
1167 s.cancel(err)
1168 return
1169 }
1170
1171 retries--
1172 continue
1173 }
1174
1175 switch err := resp.Err; err {
1176 case ErrNoError:
1177 retries = s.parent.config.Metadata.Retry.Max
1178 case ErrRebalanceInProgress:
1179 retries = s.parent.config.Metadata.Retry.Max
1180 s.cancel(err)
1181 case ErrUnknownMemberId, ErrIllegalGeneration:
1182 s.cancel(err)
1183 return
1184 case ErrFencedInstancedId:

Callers 1

newConsumerGroupSessionFunction · 0.95

Calls 8

MemberIDMethod · 0.95
GenerationIDMethod · 0.95
StopMethod · 0.80
heartbeatRequestMethod · 0.80
PrintfMethod · 0.65
CoordinatorMethod · 0.65
CloseMethod · 0.65
handleErrorMethod · 0.45

Tested by

no test coverage detected