MCPcopy
hub / github.com/IBM/sarama / InitProducerID

Method InitProducerID

client.go:277–310  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

275}
276
277func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
278 // FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go?
279 brokerErrors := make([]error, 0)
280 for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
281 request := &InitProducerIDRequest{}
282
283 if client.conf.Version.IsAtLeast(V2_7_0_0) {
284 // Version 4 adds the support for new error code PRODUCER_FENCED.
285 request.Version = 4
286 } else if client.conf.Version.IsAtLeast(V2_5_0_0) {
287 // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
288 request.Version = 3
289 } else if client.conf.Version.IsAtLeast(V2_4_0_0) {
290 // Version 2 is the first flexible version.
291 request.Version = 2
292 } else if client.conf.Version.IsAtLeast(V2_0_0_0) {
293 // Version 1 is the same as version 0.
294 request.Version = 1
295 }
296
297 response, err := broker.InitProducerID(request)
298 if err == nil {
299 return response, nil
300 } else {
301 // some error, remove that broker and try again
302 Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
303 _ = broker.Close()
304 brokerErrors = append(brokerErrors, err)
305 client.deregisterBroker(broker)
306 }
307 }
308
309 return nil, Wrap(ErrOutOfBrokers, brokerErrors...)
310}
311
312func (client *client) Close() error {
313 if client.Closed() {

Callers

nothing calls this directly

Calls 8

LeastLoadedBrokerMethod · 0.95
deregisterBrokerMethod · 0.95
WrapFunction · 0.85
IsAtLeastMethod · 0.80
IDMethod · 0.80
InitProducerIDMethod · 0.65
PrintfMethod · 0.65
CloseMethod · 0.65

Tested by

no test coverage detected