(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int)
| 1194 | } |
| 1195 | |
| 1196 | func (client *client) findCoordinator(coordinatorKey string, coordinatorType CoordinatorType, attemptsRemaining int) (*FindCoordinatorResponse, error) { |
| 1197 | retry := func(err error) (*FindCoordinatorResponse, error) { |
| 1198 | if attemptsRemaining > 0 { |
| 1199 | backoff := computeMetadataBackoff(client.conf, attemptsRemaining) |
| 1200 | attemptsRemaining-- |
| 1201 | Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining) |
| 1202 | time.Sleep(backoff) |
| 1203 | return client.findCoordinator(coordinatorKey, coordinatorType, attemptsRemaining) |
| 1204 | } |
| 1205 | return nil, err |
| 1206 | } |
| 1207 | |
| 1208 | brokerErrors := make([]error, 0) |
| 1209 | for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() { |
| 1210 | DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr()) |
| 1211 | |
| 1212 | request := new(FindCoordinatorRequest) |
| 1213 | request.CoordinatorKey = coordinatorKey |
| 1214 | request.CoordinatorType = coordinatorType |
| 1215 | |
| 1216 | // Version 1 adds KeyType. |
| 1217 | if client.conf.Version.IsAtLeast(V0_11_0_0) { |
| 1218 | request.Version = 1 |
| 1219 | } |
| 1220 | // Version 2 is the same as version 1. |
| 1221 | if client.conf.Version.IsAtLeast(V2_0_0_0) { |
| 1222 | request.Version = 2 |
| 1223 | } |
| 1224 | // Version 3 is the first flexible version |
| 1225 | if client.conf.Version.IsAtLeast(V2_4_0_0) { |
| 1226 | request.Version = 3 |
| 1227 | } |
| 1228 | |
| 1229 | response, err := broker.FindCoordinator(request) |
| 1230 | if err != nil { |
| 1231 | Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err) |
| 1232 | |
| 1233 | var packetEncodingError PacketEncodingError |
| 1234 | if errors.As(err, &packetEncodingError) { |
| 1235 | return nil, err |
| 1236 | } else { |
| 1237 | _ = broker.Close() |
| 1238 | brokerErrors = append(brokerErrors, err) |
| 1239 | client.deregisterBroker(broker) |
| 1240 | continue |
| 1241 | } |
| 1242 | } |
| 1243 | |
| 1244 | if errors.Is(response.Err, ErrNoError) { |
| 1245 | DebugLogger.Printf("client/coordinator coordinator for %s is #%d (%s)\n", coordinatorKey, response.Coordinator.ID(), response.Coordinator.Addr()) |
| 1246 | return response, nil |
| 1247 | } else if errors.Is(response.Err, ErrConsumerCoordinatorNotAvailable) { |
| 1248 | Logger.Printf("client/coordinator coordinator for %s is not available\n", coordinatorKey) |
| 1249 | |
| 1250 | // This is very ugly, but this scenario will only happen once per cluster. |
| 1251 | // The __consumer_offsets topic only has to be created one time. |
| 1252 | // The number of partitions not configurable, but partition 0 should always exist. |
| 1253 | if _, err := client.Leader("__consumer_offsets", 0); err != nil { |
no test coverage detected