MCPcopy
hub / github.com/IBM/sarama / findCoordinator

Method findCoordinator

client.go:1196–1276  ·  view source on GitHub ↗
(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int)

Source from the content-addressed store, hash-verified

1194}
1195
1196func (client *client) findCoordinator(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int) (*FindCoordinatorResponse, error) {
1197 retry := func(err error) (*FindCoordinatorResponse, error) {
1198 if attemptsRemaining > 0 {
1199 backoff := computeMetadataBackoff(client.conf, attemptsRemaining)
1200 attemptsRemaining--
1201 Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
1202 time.Sleep(backoff)
1203 return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining)
1204 }
1205 return nil, err
1206 }
1207
1208 brokerErrors := make([]error, 0)
1209 for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
1210 DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())
1211
1212 request := new(FindCoordinatorRequest)
1213 request.CoordinatorKey = coordinatorKey
1214 request.CoordinatorType = coordinatorType
1215
1216 // Version 1 adds KeyType.
1217 if client.conf.Version.IsAtLeast(V0_11_0_0) {
1218 request.Version = 1
1219 }
1220 // Version 2 is the same as version 1.
1221 if client.conf.Version.IsAtLeast(V2_0_0_0) {
1222 request.Version = 2
1223 }
1224 // Version 3 is the first flexible version
1225 if client.conf.Version.IsAtLeast(V2_4_0_0) {
1226 request.Version = 3
1227 }
1228
1229 response, err := broker.FindCoordinator(request)
1230 if err != nil {
1231 Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
1232
1233 var packetEncodingError PacketEncodingError
1234 if errors.As(err, &packetEncodingError) {
1235 return nil, err
1236 } else {
1237 _ = broker.Close()
1238 brokerErrors = append(brokerErrors, err)
1239 client.deregisterBroker(broker)
1240 continue
1241 }
1242 }
1243
1244 if errors.Is(response.Err, ErrNoError) {
1245 DebugLogger.Printf("client/coordinator coordinator for %s is #%d (%s)\n", coordinatorKey, response.Coordinator.ID(), response.Coordinator.Addr())
1246 return response, nil
1247 } else if errors.Is(response.Err, ErrConsumerCoordinatorNotAvailable) {
1248 Logger.Printf("client/coordinator coordinator for %s is not available\n", coordinatorKey)
1249
1250 // This is very ugly, but this scenario will only happen once per cluster.
1251 // The __consumer_offsets topic only has to be created one time.
1252 // The number of partitions not configurable, but partition 0 should always exist.
1253 if _, err := client.Leader("__consumer_offsets", 0); err != nil {

Callers 2

RefreshCoordinatorMethod · 0.95

Calls 14

LeastLoadedBrokerMethod · 0.95
deregisterBrokerMethod · 0.95
LeaderMethod · 0.95
resurrectDeadBrokersMethod · 0.95
computeMetadataBackoffFunction · 0.85
WrapFunction · 0.85
IsAtLeastMethod · 0.80
FindCoordinatorMethod · 0.80
IsMethod · 0.80
IDMethod · 0.80
PrintfMethod · 0.65
CloseMethod · 0.65

Tested by

no test coverage detected