(t *testing.T)
| 446 | } |
| 447 | |
| 448 | func TestClientReceivingPartialMetadata(t *testing.T) { |
| 449 | seedBroker := NewMockBroker(t, 1) |
| 450 | leader := NewMockBroker(t, 5) |
| 451 | |
| 452 | metadataResponse1 := new(MetadataResponse) |
| 453 | metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID()) |
| 454 | seedBroker.Returns(metadataResponse1) |
| 455 | |
| 456 | config := NewTestConfig() |
| 457 | config.Metadata.Retry.Max = 0 |
| 458 | client, err := NewClient([]string{seedBroker.Addr()}, config) |
| 459 | if err != nil { |
| 460 | t.Fatal(err) |
| 461 | } |
| 462 | |
| 463 | replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()} |
| 464 | |
| 465 | metadataPartial := new(MetadataResponse) |
| 466 | metadataPartial.AddBroker(leader.Addr(), 5) |
| 467 | metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable) |
| 468 | metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError) |
| 469 | metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable) |
| 470 | leader.Returns(metadataPartial) |
| 471 | |
| 472 | // the leaderless error is now propagated so callers can back off (#3514); |
| 473 | // partial partition data is still cached for the lookups below |
| 474 | if err := client.RefreshMetadata("new_topic"); !errors.Is(err, ErrLeaderNotAvailable) { |
| 475 | t.Error("Expected ErrLeaderNotAvailable, got", err) |
| 476 | } |
| 477 | |
| 478 | // Even though the metadata was incomplete, we should be able to get the leader of a partition |
| 479 | // for which we did get a useful response, without doing additional requests. |
| 480 | |
| 481 | partition0Leader, err := client.Leader("new_topic", 0) |
| 482 | if err != nil { |
| 483 | t.Error(err) |
| 484 | } else if partition0Leader.Addr() != leader.Addr() { |
| 485 | t.Error("Unexpected leader returned", partition0Leader.Addr()) |
| 486 | } |
| 487 | |
| 488 | // If we are asking for the leader of a partition that didn't have a leader before, |
| 489 | // we will do another metadata request. |
| 490 | |
| 491 | leader.Returns(metadataPartial) |
| 492 | |
| 493 | // Still no leader for the partition, so asking for it should return an error. |
| 494 | _, err = client.Leader("new_topic", 1) |
| 495 | if !errors.Is(err, ErrLeaderNotAvailable) { |
| 496 | t.Error("Expected ErrLeaderNotAvailable, got", err) |
| 497 | } |
| 498 | |
| 499 | safeClose(t, client) |
| 500 | seedBroker.Close() |
| 501 | leader.Close() |
| 502 | } |
| 503 | |
| 504 | func TestClientRefreshBehaviourWhenEmptyMetadataResponse(t *testing.T) { |
| 505 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected