refreshMetadata forces an update of the cached cluster metadata, and waits for the given list of topics to appear. This waiting mechanism is necessary to account for the fact that topic creation is asynchronous in kafka, and causes subsequent requests to fail while the cluster state is propagated to
(ctx context.Context, expectTopics []string)
| 448 | // causes subsequent requests to fail while the cluster state is propagated to |
| 449 | // all the brokers. |
| 450 | func (p *connPool) refreshMetadata(ctx context.Context, expectTopics []string) { |
| 451 | minBackoff := 100 * time.Millisecond |
| 452 | maxBackoff := 2 * time.Second |
| 453 | cancel := ctx.Done() |
| 454 | |
| 455 | for ctx.Err() == nil { |
| 456 | notify := make(event) |
| 457 | select { |
| 458 | case <-cancel: |
| 459 | return |
| 460 | case p.wake <- notify: |
| 461 | select { |
| 462 | case <-notify: |
| 463 | case <-cancel: |
| 464 | return |
| 465 | } |
| 466 | } |
| 467 | |
| 468 | state := p.grabState() |
| 469 | found := 0 |
| 470 | |
| 471 | for _, topic := range expectTopics { |
| 472 | if _, ok := state.layout.Topics[topic]; ok { |
| 473 | found++ |
| 474 | } |
| 475 | } |
| 476 | |
| 477 | if found == len(expectTopics) { |
| 478 | return |
| 479 | } |
| 480 | |
| 481 | if delay := time.Duration(rand.Int63n(int64(minBackoff))); delay > 0 { |
| 482 | timer := time.NewTimer(minBackoff) |
| 483 | select { |
| 484 | case <-cancel: |
| 485 | case <-timer.C: |
| 486 | } |
| 487 | timer.Stop() |
| 488 | |
| 489 | if minBackoff *= 2; minBackoff > maxBackoff { |
| 490 | minBackoff = maxBackoff |
| 491 | } |
| 492 | } |
| 493 | } |
| 494 | } |
| 495 | |
| 496 | func (p *connPool) setReady() { |
| 497 | p.once.Do(p.ready.trigger) |