MCPcopy
hub / github.com/segmentio/kafka-go / refreshMetadata

Method refreshMetadata

transport.go:450–494  ·  view source on GitHub ↗

refreshMetadata forces an update of the cached cluster metadata, and waits for the given list of topics to appear. This waiting mechanism is necessary to account for the fact that topic creation is asynchronous in kafka, and causes subsequent requests to fail while the cluster state is propagated to

(ctx context.Context, expectTopics []string)

Source from the content-addressed store, hash-verified

448// causes subsequent requests to fail while the cluster state is propagated to
449// all the brokers.
450func (p *connPool) refreshMetadata(ctx context.Context, expectTopics []string) {
451 minBackoff := 100 * time.Millisecond
452 maxBackoff := 2 * time.Second
453 cancel := ctx.Done()
454
455 for ctx.Err() == nil {
456 notify := make(event)
457 select {
458 case <-cancel:
459 return
460 case p.wake <- notify:
461 select {
462 case <-notify:
463 case <-cancel:
464 return
465 }
466 }
467
468 state := p.grabState()
469 found := 0
470
471 for _, topic := range expectTopics {
472 if _, ok := state.layout.Topics[topic]; ok {
473 found++
474 }
475 }
476
477 if found == len(expectTopics) {
478 return
479 }
480
481 if delay := time.Duration(rand.Int63n(int64(minBackoff))); delay > 0 {
482 timer := time.NewTimer(minBackoff)
483 select {
484 case <-cancel:
485 case <-timer.C:
486 }
487 timer.Stop()
488
489 if minBackoff *= 2; minBackoff > maxBackoff {
490 minBackoff = maxBackoff
491 }
492 }
493 }
494}
495
496func (p *connPool) setReady() {
497 p.once.Do(p.ready.trigger)

Callers 1

roundTripMethod · 0.95

Calls 3

grabStateMethod · 0.95
DoneMethod · 0.80
ErrMethod · 0.45

Tested by

no test coverage detected