MCPcopy
hub / github.com/segmentio/kafka-go / discover

Method discover

transport.go:589–639  ·  view source on GitHub ↗

discover is the entry point of an internal goroutine for the transport which periodically requests updates of the cluster metadata and refreshes the transport cached cluster layout.

(ctx context.Context, wake <-chan event)

Source from the content-addressed store, hash-verified

587// periodically requests updates of the cluster metadata and refreshes the
588// transport cached cluster layout.
589func (p *connPool) discover(ctx context.Context, wake <-chan event) {
590 prng := rand.New(rand.NewSource(time.Now().UnixNano()))
591 metadataTTL := func() time.Duration {
592 return time.Duration(prng.Int63n(int64(p.metadataTTL)))
593 }
594
595 timer := time.NewTimer(metadataTTL())
596 defer timer.Stop()
597
598 var notify event
599 done := ctx.Done()
600
601 req := &meta.Request{
602 TopicNames: p.metadataTopics,
603 }
604
605 for {
606 c, err := p.grabClusterConn(ctx)
607 if err != nil {
608 p.update(ctx, nil, err)
609 } else {
610 res := make(async, 1)
611 deadline, cancel := context.WithTimeout(ctx, p.metadataTTL)
612 c.reqs <- connRequest{
613 ctx: deadline,
614 req: req,
615 res: res,
616 }
617 r, err := res.await(deadline)
618 cancel()
619 if err != nil && errors.Is(err, ctx.Err()) {
620 return
621 }
622 ret, _ := r.(*meta.Response)
623 p.update(ctx, ret, err)
624 }
625
626 if notify != nil {
627 notify.trigger()
628 notify = nil
629 }
630
631 select {
632 case <-timer.C:
633 timer.Reset(metadataTTL())
634 case <-done:
635 return
636 case notify = <-wake:
637 }
638 }
639}
640
641// grabBrokerConn returns a connection to a specific broker represented by the
642// broker id passed as argument. If the broker id was not known, an error is

Callers 1

grabPoolMethod · 0.80

Calls 7

grabClusterConnMethod · 0.95
updateMethod · 0.95
triggerMethod · 0.95
DoneMethod · 0.80
awaitMethod · 0.65
ErrMethod · 0.45
ResetMethod · 0.45

Tested by

no test coverage detected