handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
()
| 1179 | |
| 1180 | // handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed |
| 1181 | func (bc *brokerConsumer) handleResponses() { |
| 1182 | for child := range bc.subscriptions { |
| 1183 | select { |
| 1184 | case <-child.dying: |
| 1185 | bc.releaseSubscription(child) |
| 1186 | child.stopDispatcher() |
| 1187 | continue |
| 1188 | default: |
| 1189 | } |
| 1190 | |
| 1191 | result := child.responseResult |
| 1192 | child.responseResult = nil |
| 1193 | |
| 1194 | if result == nil { |
| 1195 | if preferredBroker, _, err := child.preferredBroker(); err == nil { |
| 1196 | if bc.broker.ID() != preferredBroker.ID() { |
| 1197 | // not an error but needs redispatching to consume from preferred replica |
| 1198 | Logger.Printf( |
| 1199 | "consumer/broker/%d abandoned in favor of preferred replica broker/%d\n", |
| 1200 | bc.broker.ID(), preferredBroker.ID()) |
| 1201 | child.triggerRedispatch() |
| 1202 | bc.releaseSubscription(child) |
| 1203 | } |
| 1204 | } |
| 1205 | continue |
| 1206 | } |
| 1207 | |
| 1208 | // Discard any replica preference. |
| 1209 | child.preferredReadReplica = invalidPreferredReplicaID |
| 1210 | child.preferredReadReplicaExpiry = time.Time{} |
| 1211 | |
| 1212 | if errors.Is(result, errTimedOut) { |
| 1213 | Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n", |
| 1214 | bc.broker.ID(), child.topic, child.partition) |
| 1215 | // responseFeeder already requeued this subscription onto bc.input |
| 1216 | // so it will loop back through subscriptionManager so no need to |
| 1217 | // release it here |
| 1218 | delete(bc.subscriptions, child) |
| 1219 | } else if errors.Is(result, ErrOffsetOutOfRange) { |
| 1220 | // there's no point in retrying this it will just fail the same way again |
| 1221 | // shut it down and force the user to choose what to do |
| 1222 | child.sendError(result) |
| 1223 | Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result) |
| 1224 | child.stopDispatcher() |
| 1225 | child.AsyncClose() |
| 1226 | bc.releaseSubscription(child) |
| 1227 | } else if errors.Is(result, ErrUnknownTopicOrPartition) || |
| 1228 | errors.Is(result, ErrNotLeaderForPartition) || |
| 1229 | errors.Is(result, ErrLeaderNotAvailable) || |
| 1230 | errors.Is(result, ErrReplicaNotAvailable) || |
| 1231 | errors.Is(result, ErrFencedLeaderEpoch) || |
| 1232 | errors.Is(result, ErrUnknownLeaderEpoch) { |
| 1233 | // not an error, but does need redispatching |
| 1234 | Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", |
| 1235 | bc.broker.ID(), child.topic, child.partition, result) |
| 1236 | child.triggerRedispatch() |
| 1237 | bc.releaseSubscription(child) |
| 1238 | } else { |
no test coverage detected