If a Kafka broker becomes unavailable and then returns back in service, then producer reconnects to it and continues sending messages.
(t *testing.T)
| 474 | // If a Kafka broker becomes unavailable and then returns back in service, then |
| 475 | // producer reconnects to it and continues sending messages. |
| 476 | func TestAsyncProducerBrokerBounce(t *testing.T) { |
| 477 | // Given |
| 478 | seedBroker := NewMockBroker(t, 1) |
| 479 | leader := NewMockBroker(t, 2) |
| 480 | leaderAddr := leader.Addr() |
| 481 | |
| 482 | metadataResponse := new(MetadataResponse) |
| 483 | metadataResponse.AddBroker(leaderAddr, leader.BrokerID()) |
| 484 | metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 485 | seedBroker.Returns(metadataResponse) |
| 486 | |
| 487 | prodSuccess := new(ProduceResponse) |
| 488 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 489 | |
| 490 | config := NewTestConfig() |
| 491 | config.Producer.Flush.Messages = 1 |
| 492 | config.Producer.Return.Successes = true |
| 493 | config.Producer.Retry.Backoff = 0 |
| 494 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 495 | if err != nil { |
| 496 | t.Fatal(err) |
| 497 | } |
| 498 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 499 | leader.Returns(prodSuccess) |
| 500 | expectResults(t, producer, 1, 0) |
| 501 | |
| 502 | // When: a broker connection gets reset by a broker (network glitch, restart, you name it). |
| 503 | leader.Close() // producer should get EOF |
| 504 | leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles |
| 505 | leader.Returns(metadataResponse) // tell it to go to broker 2 again |
| 506 | |
| 507 | // Then: a produced message goes through the new broker connection. |
| 508 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 509 | leader.Returns(prodSuccess) |
| 510 | expectResults(t, producer, 1, 0) |
| 511 | |
| 512 | closeProducer(t, producer) |
| 513 | seedBroker.Close() |
| 514 | leader.Close() |
| 515 | } |
| 516 | |
| 517 | func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) { |
| 518 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected