(t *testing.T)
| 302 | } |
| 303 | |
| 304 | func TestAsyncProducerFailureRetry(t *testing.T) { |
| 305 | seedBroker := NewMockBroker(t, 1) |
| 306 | leader1 := NewMockBroker(t, 2) |
| 307 | leader2 := NewMockBroker(t, 3) |
| 308 | |
| 309 | metadataLeader1 := new(MetadataResponse) |
| 310 | metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID()) |
| 311 | metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError) |
| 312 | seedBroker.Returns(metadataLeader1) |
| 313 | |
| 314 | config := NewTestConfig() |
| 315 | config.Producer.Flush.Messages = 10 |
| 316 | config.Producer.Return.Successes = true |
| 317 | config.Producer.Retry.Backoff = 0 |
| 318 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 319 | if err != nil { |
| 320 | t.Fatal(err) |
| 321 | } |
| 322 | seedBroker.Close() |
| 323 | |
| 324 | for range 10 { |
| 325 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 326 | } |
| 327 | prodNotLeader := new(ProduceResponse) |
| 328 | prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition) |
| 329 | leader1.Returns(prodNotLeader) |
| 330 | |
| 331 | metadataLeader2 := new(MetadataResponse) |
| 332 | metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID()) |
| 333 | metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError) |
| 334 | leader1.Returns(metadataLeader2) |
| 335 | |
| 336 | prodSuccess := new(ProduceResponse) |
| 337 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 338 | leader2.Returns(prodSuccess) |
| 339 | expectResults(t, producer, 10, 0) |
| 340 | leader1.Close() |
| 341 | |
| 342 | for range 10 { |
| 343 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 344 | } |
| 345 | leader2.Returns(prodSuccess) |
| 346 | expectResults(t, producer, 10, 0) |
| 347 | |
| 348 | leader2.Close() |
| 349 | closeProducer(t, producer) |
| 350 | } |
| 351 | |
| 352 | func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) { |
| 353 | tt := func(t *testing.T, kErr KError) { |
nothing calls this directly
no test coverage detected