()
| 275 | } |
| 276 | |
| 277 | func (client *client) InitProducerID() (*InitProducerIDResponse, error) { |
| 278 | // FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go? |
| 279 | brokerErrors := make([]error, 0) |
| 280 | for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() { |
| 281 | request := &InitProducerIDRequest{} |
| 282 | |
| 283 | if client.conf.Version.IsAtLeast(V2_7_0_0) { |
| 284 | // Version 4 adds the support for new error code PRODUCER_FENCED. |
| 285 | request.Version = 4 |
| 286 | } else if client.conf.Version.IsAtLeast(V2_5_0_0) { |
| 287 | // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error |
| 288 | request.Version = 3 |
| 289 | } else if client.conf.Version.IsAtLeast(V2_4_0_0) { |
| 290 | // Version 2 is the first flexible version. |
| 291 | request.Version = 2 |
| 292 | } else if client.conf.Version.IsAtLeast(V2_0_0_0) { |
| 293 | // Version 1 is the same as version 0. |
| 294 | request.Version = 1 |
| 295 | } |
| 296 | |
| 297 | response, err := broker.InitProducerID(request) |
| 298 | if err == nil { |
| 299 | return response, nil |
| 300 | } else { |
| 301 | // some error, remove that broker and try again |
| 302 | Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err) |
| 303 | _ = broker.Close() |
| 304 | brokerErrors = append(brokerErrors, err) |
| 305 | client.deregisterBroker(broker) |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | return nil, Wrap(ErrOutOfBrokers, brokerErrors...) |
| 310 | } |
| 311 | |
| 312 | func (client *client) Close() error { |
| 313 | if client.Closed() { |
nothing calls this directly
no test coverage detected