MCPcopy
hub / github.com/IBM/sarama / handleResponses

Method handleResponses

consumer.go:1181–1247  ·  view source on GitHub ↗

handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed

()

Source from the content-addressed store, hash-verified

1179
1180// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
1181func (bc *brokerConsumer) handleResponses() {
1182 for child := range bc.subscriptions {
1183 select {
1184 case <-child.dying:
1185 bc.releaseSubscription(child)
1186 child.stopDispatcher()
1187 continue
1188 default:
1189 }
1190
1191 result := child.responseResult
1192 child.responseResult = nil
1193
1194 if result == nil {
1195 if preferredBroker, _, err := child.preferredBroker(); err == nil {
1196 if bc.broker.ID() != preferredBroker.ID() {
1197 // not an error but needs redispatching to consume from preferred replica
1198 Logger.Printf(
1199 "consumer/broker/%d abandoned in favor of preferred replica broker/%d\n",
1200 bc.broker.ID(), preferredBroker.ID())
1201 child.triggerRedispatch()
1202 bc.releaseSubscription(child)
1203 }
1204 }
1205 continue
1206 }
1207
1208 // Discard any replica preference.
1209 child.preferredReadReplica = invalidPreferredReplicaID
1210 child.preferredReadReplicaExpiry = time.Time{}
1211
1212 if errors.Is(result, errTimedOut) {
1213 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
1214 bc.broker.ID(), child.topic, child.partition)
1215 // responseFeeder already requeued this subscription onto bc.input
1216 // so it will loop back through subscriptionManager so no need to
1217 // release it here
1218 delete(bc.subscriptions, child)
1219 } else if errors.Is(result, ErrOffsetOutOfRange) {
1220 // there's no point in retrying this it will just fail the same way again
1221 // shut it down and force the user to choose what to do
1222 child.sendError(result)
1223 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
1224 child.stopDispatcher()
1225 child.AsyncClose()
1226 bc.releaseSubscription(child)
1227 } else if errors.Is(result, ErrUnknownTopicOrPartition) ||
1228 errors.Is(result, ErrNotLeaderForPartition) ||
1229 errors.Is(result, ErrLeaderNotAvailable) ||
1230 errors.Is(result, ErrReplicaNotAvailable) ||
1231 errors.Is(result, ErrFencedLeaderEpoch) ||
1232 errors.Is(result, ErrUnknownLeaderEpoch) {
1233 // not an error, but does need redispatching
1234 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
1235 bc.broker.ID(), child.topic, child.partition, result)
1236 child.triggerRedispatch()
1237 bc.releaseSubscription(child)
1238 } else {

Callers 1

subscriptionConsumerMethod · 0.95

Calls 9

releaseSubscriptionMethod · 0.95
stopDispatcherMethod · 0.80
preferredBrokerMethod · 0.80
IDMethod · 0.80
triggerRedispatchMethod · 0.80
IsMethod · 0.80
sendErrorMethod · 0.80
PrintfMethod · 0.65
AsyncCloseMethod · 0.65

Tested by

no test coverage detected