(t *testing.T)
| 687 | } |
| 688 | |
| 689 | func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { |
| 690 | setupFunctionalTest(t) |
| 691 | defer teardownFunctionalTest(t) |
| 692 | |
| 693 | config := NewFunctionalTestConfig() |
| 694 | config.Producer.Flush.Frequency = 250 * time.Millisecond |
| 695 | config.Producer.Idempotent = true |
| 696 | config.Producer.Timeout = 500 * time.Millisecond |
| 697 | config.Producer.Retry.Max = 1 |
| 698 | config.Producer.Retry.Backoff = 500 * time.Millisecond |
| 699 | config.Producer.Return.Successes = true |
| 700 | config.Producer.Return.Errors = true |
| 701 | config.Producer.RequiredAcks = WaitForAll |
| 702 | config.Net.MaxOpenRequests = 1 |
| 703 | |
| 704 | producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 705 | if err != nil { |
| 706 | t.Fatal(err) |
| 707 | } |
| 708 | defer safeClose(t, producer) |
| 709 | |
| 710 | // Successfully publish a few messages |
| 711 | for i := 0; i < 10; i++ { |
| 712 | _, _, err = producer.SendMessage(&ProducerMessage{ |
| 713 | Topic: "test.1", |
| 714 | Value: StringEncoder(fmt.Sprintf("%d message", i)), |
| 715 | }) |
| 716 | if err != nil { |
| 717 | t.Fatal(err) |
| 718 | } |
| 719 | } |
| 720 | |
| 721 | // break the brokers. |
| 722 | for proxyName, proxy := range FunctionalTestEnv.Proxies { |
| 723 | if !strings.Contains(proxyName, "kafka") { |
| 724 | continue |
| 725 | } |
| 726 | if err := proxy.Disable(); err != nil { |
| 727 | t.Fatal(err) |
| 728 | } |
| 729 | } |
| 730 | |
| 731 | // This should fail hard now |
| 732 | for i := 10; i < 20; i++ { |
| 733 | _, _, err = producer.SendMessage(&ProducerMessage{ |
| 734 | Topic: "test.1", |
| 735 | Value: StringEncoder(fmt.Sprintf("%d message", i)), |
| 736 | }) |
| 737 | if err == nil { |
| 738 | t.Fatal(err) |
| 739 | } |
| 740 | } |
| 741 | |
| 742 | // Now bring the proxy back up |
| 743 | for proxyName, proxy := range FunctionalTestEnv.Proxies { |
| 744 | if !strings.Contains(proxyName, "kafka") { |
| 745 | continue |
| 746 | } |
nothing calls this directly
no test coverage detected