MCPcopy
hub / github.com/segmentio/kafka-go / update

Method update

transport.go:503–584  ·  view source on GitHub ↗

update is called periodically by the goroutine running the discover method to refresh the cluster layout information used by the transport to route requests to brokers.

(ctx context.Context, metadata *meta.Response, err error)

Source from the content-addressed store, hash-verified

501// to refresh the cluster layout information used by the transport to route
502// requests to brokers.
503func (p *connPool) update(ctx context.Context, metadata *meta.Response, err error) {
504 var layout protocol.Cluster
505
506 if metadata != nil {
507 metadata.ThrottleTimeMs = 0
508
509 // Normalize the lists so we can apply binary search on them.
510 sortMetadataBrokers(metadata.Brokers)
511 sortMetadataTopics(metadata.Topics)
512
513 for i := range metadata.Topics {
514 t := &metadata.Topics[i]
515 sortMetadataPartitions(t.Partitions)
516 }
517
518 layout = makeLayout(metadata)
519 }
520
521 state := p.grabState()
522 addBrokers := make(map[int32]struct{})
523 delBrokers := make(map[int32]struct{})
524
525 if err != nil {
526 // Only update the error on the transport if the cluster layout was
527 // unknown. This ensures that we prioritize a previously known state
528 // of the cluster to reduce the impact of transient failures.
529 if state.metadata != nil {
530 return
531 }
532 state.err = err
533 } else {
534 for id, b2 := range layout.Brokers {
535 if b1, ok := state.layout.Brokers[id]; !ok {
536 addBrokers[id] = struct{}{}
537 } else if b1 != b2 {
538 addBrokers[id] = struct{}{}
539 delBrokers[id] = struct{}{}
540 }
541 }
542
543 for id := range state.layout.Brokers {
544 if _, ok := layout.Brokers[id]; !ok {
545 delBrokers[id] = struct{}{}
546 }
547 }
548
549 state.metadata, state.layout = metadata, layout
550 state.err = nil
551 }
552
553 defer p.setReady()
554 defer p.setState(state)
555
556 if len(addBrokers) != 0 || len(delBrokers) != 0 {
557 // Only acquire the lock when there is a change of layout. This is an
558 // infrequent event so we don't risk introducing regular contention on
559 // the mutex if we were to lock it on every update.
560 p.mutex.Lock()

Callers 1

discoverMethod · 0.95

Calls 10

grabStateMethod · 0.95
setReadyMethod · 0.95
setStateMethod · 0.95
newBrokerConnGroupMethod · 0.95
sortMetadataBrokersFunction · 0.85
sortMetadataTopicsFunction · 0.85
sortMetadataPartitionsFunction · 0.85
makeLayoutFunction · 0.85
closeIdleConnsMethod · 0.80
ErrMethod · 0.45

Tested by

no test coverage detected