discover is the entry point of an internal goroutine for the transport which periodically requests updates of the cluster metadata and refreshes the transport cached cluster layout.
(ctx context.Context, wake <-chan event)
| 587 | // periodically requests updates of the cluster metadata and refreshes the |
| 588 | // transport cached cluster layout. |
| 589 | func (p *connPool) discover(ctx context.Context, wake <-chan event) { |
| 590 | prng := rand.New(rand.NewSource(time.Now().UnixNano())) |
| 591 | metadataTTL := func() time.Duration { |
| 592 | return time.Duration(prng.Int63n(int64(p.metadataTTL))) |
| 593 | } |
| 594 | |
| 595 | timer := time.NewTimer(metadataTTL()) |
| 596 | defer timer.Stop() |
| 597 | |
| 598 | var notify event |
| 599 | done := ctx.Done() |
| 600 | |
| 601 | req := &meta.Request{ |
| 602 | TopicNames: p.metadataTopics, |
| 603 | } |
| 604 | |
| 605 | for { |
| 606 | c, err := p.grabClusterConn(ctx) |
| 607 | if err != nil { |
| 608 | p.update(ctx, nil, err) |
| 609 | } else { |
| 610 | res := make(async, 1) |
| 611 | deadline, cancel := context.WithTimeout(ctx, p.metadataTTL) |
| 612 | c.reqs <- connRequest{ |
| 613 | ctx: deadline, |
| 614 | req: req, |
| 615 | res: res, |
| 616 | } |
| 617 | r, err := res.await(deadline) |
| 618 | cancel() |
| 619 | if err != nil && errors.Is(err, ctx.Err()) { |
| 620 | return |
| 621 | } |
| 622 | ret, _ := r.(*meta.Response) |
| 623 | p.update(ctx, ret, err) |
| 624 | } |
| 625 | |
| 626 | if notify != nil { |
| 627 | notify.trigger() |
| 628 | notify = nil |
| 629 | } |
| 630 | |
| 631 | select { |
| 632 | case <-timer.C: |
| 633 | timer.Reset(metadataTTL()) |
| 634 | case <-done: |
| 635 | return |
| 636 | case notify = <-wake: |
| 637 | } |
| 638 | } |
| 639 | } |
| 640 | |
| 641 | // grabBrokerConn returns a connection to a specific broker represented by the |
| 642 | // broker id passed as argument. If the broker id was not known, an error is |