MCPcopy
hub / github.com/grafana/tempo / HandleKafkaError

Function HandleKafkaError

pkg/ingest/util.go:76–112  ·  view source on GitHub ↗
(err error, forceMetadataRefresh func())

Source from the content-addressed store, hash-verified

74}
75
76func HandleKafkaError(err error, forceMetadataRefresh func()) {
77 if err == nil {
78 return
79 }
80 errString := err.Error()
81
82 switch {
83 // We're asking a broker which is no longer the leader. For a partition. We should refresh our metadata and try again.
84 case errors.Is(err, kerr.NotLeaderForPartition):
85 forceMetadataRefresh()
86 // Maybe the replica hasn't replicated the log yet, or it is no longer a replica for this partition.
87 // We should refresh and try again with a leader or replica which is up to date.
88 case errors.Is(err, kerr.ReplicaNotAvailable):
89 forceMetadataRefresh()
90 // Maybe there's an ongoing election. We should refresh our metadata and try again with a leader in the current epoch.
91 case errors.Is(err, kerr.UnknownLeaderEpoch):
92 forceMetadataRefresh()
93 case errors.Is(err, kerr.LeaderNotAvailable):
94 forceMetadataRefresh()
95 case errors.Is(err, kerr.BrokerNotAvailable):
96 forceMetadataRefresh()
97 // Topic or partition doesn't exist - metadata refresh needed to get current topology
98 case errors.Is(err, kerr.UnknownTopicOrPartition):
99 forceMetadataRefresh()
100 // Network connectivity issues - broker may have changed
101 case errors.Is(err, kerr.NetworkException):
102 forceMetadataRefresh()
103 // Coordinator moved to different broker
104 case errors.Is(err, kerr.NotCoordinator):
105 forceMetadataRefresh()
106 case strings.Contains(errString, "i/o timeout"):
107 forceMetadataRefresh()
108 case strings.Contains(errString, unknownBroker):
109 forceMetadataRefresh()
110 // The client's metadata refreshed after we called Broker(). It should already be refreshed, so we can retry immediately.
111 }
112}

Callers 4

commitOffsetMethod · 0.92
fetchPartitionsMethod · 0.92
TestHandleKafkaErrorFunction · 0.85

Calls 1

ErrorMethod · 0.65

Tested by 1

TestHandleKafkaErrorFunction · 0.68