| 74 | } |
| 75 | |
| 76 | func HandleKafkaError(err error, forceMetadataRefresh func()) { |
| 77 | if err == nil { |
| 78 | return |
| 79 | } |
| 80 | errString := err.Error() |
| 81 | |
| 82 | switch { |
| 83 | // We're asking a broker which is no longer the leader. For a partition. We should refresh our metadata and try again. |
| 84 | case errors.Is(err, kerr.NotLeaderForPartition): |
| 85 | forceMetadataRefresh() |
| 86 | // Maybe the replica hasn't replicated the log yet, or it is no longer a replica for this partition. |
| 87 | // We should refresh and try again with a leader or replica which is up to date. |
| 88 | case errors.Is(err, kerr.ReplicaNotAvailable): |
| 89 | forceMetadataRefresh() |
| 90 | // Maybe there's an ongoing election. We should refresh our metadata and try again with a leader in the current epoch. |
| 91 | case errors.Is(err, kerr.UnknownLeaderEpoch): |
| 92 | forceMetadataRefresh() |
| 93 | case errors.Is(err, kerr.LeaderNotAvailable): |
| 94 | forceMetadataRefresh() |
| 95 | case errors.Is(err, kerr.BrokerNotAvailable): |
| 96 | forceMetadataRefresh() |
| 97 | // Topic or partition doesn't exist - metadata refresh needed to get current topology |
| 98 | case errors.Is(err, kerr.UnknownTopicOrPartition): |
| 99 | forceMetadataRefresh() |
| 100 | // Network connectivity issues - broker may have changed |
| 101 | case errors.Is(err, kerr.NetworkException): |
| 102 | forceMetadataRefresh() |
| 103 | // Coordinator moved to different broker |
| 104 | case errors.Is(err, kerr.NotCoordinator): |
| 105 | forceMetadataRefresh() |
| 106 | case strings.Contains(errString, "i/o timeout"): |
| 107 | forceMetadataRefresh() |
| 108 | case strings.Contains(errString, unknownBroker): |
| 109 | forceMetadataRefresh() |
| 110 | // The client's metadata refreshed after we called Broker(). It should already be refreshed, so we can retry immediately. |
| 111 | } |
| 112 | } |