update is called periodically by the goroutine running the discover method to refresh the cluster layout information used by the transport to route requests to brokers.
(ctx context.Context, metadata *meta.Response, err error)
| 501 | // to refresh the cluster layout information used by the transport to route |
| 502 | // requests to brokers. |
| 503 | func (p *connPool) update(ctx context.Context, metadata *meta.Response, err error) { |
| 504 | var layout protocol.Cluster |
| 505 | |
| 506 | if metadata != nil { |
| 507 | metadata.ThrottleTimeMs = 0 |
| 508 | |
| 509 | // Normalize the lists so we can apply binary search on them. |
| 510 | sortMetadataBrokers(metadata.Brokers) |
| 511 | sortMetadataTopics(metadata.Topics) |
| 512 | |
| 513 | for i := range metadata.Topics { |
| 514 | t := &metadata.Topics[i] |
| 515 | sortMetadataPartitions(t.Partitions) |
| 516 | } |
| 517 | |
| 518 | layout = makeLayout(metadata) |
| 519 | } |
| 520 | |
| 521 | state := p.grabState() |
| 522 | addBrokers := make(map[int32]struct{}) |
| 523 | delBrokers := make(map[int32]struct{}) |
| 524 | |
| 525 | if err != nil { |
| 526 | // Only update the error on the transport if the cluster layout was |
| 527 | // unknown. This ensures that we prioritize a previously known state |
| 528 | // of the cluster to reduce the impact of transient failures. |
| 529 | if state.metadata != nil { |
| 530 | return |
| 531 | } |
| 532 | state.err = err |
| 533 | } else { |
| 534 | for id, b2 := range layout.Brokers { |
| 535 | if b1, ok := state.layout.Brokers[id]; !ok { |
| 536 | addBrokers[id] = struct{}{} |
| 537 | } else if b1 != b2 { |
| 538 | addBrokers[id] = struct{}{} |
| 539 | delBrokers[id] = struct{}{} |
| 540 | } |
| 541 | } |
| 542 | |
| 543 | for id := range state.layout.Brokers { |
| 544 | if _, ok := layout.Brokers[id]; !ok { |
| 545 | delBrokers[id] = struct{}{} |
| 546 | } |
| 547 | } |
| 548 | |
| 549 | state.metadata, state.layout = metadata, layout |
| 550 | state.err = nil |
| 551 | } |
| 552 | |
| 553 | defer p.setReady() |
| 554 | defer p.setState(state) |
| 555 | |
| 556 | if len(addBrokers) != 0 || len(delBrokers) != 0 { |
| 557 | // Only acquire the lock when there is a change of layout. This is an |
| 558 | // infrequent event so we don't risk introducing regular contention on |
| 559 | // the mutex if we were to lock it on every update. |
| 560 | p.mutex.Lock() |
no test coverage detected