(t *testing.T)
| 867 | } |
| 868 | |
| 869 | func TestInterceptors(t *testing.T) { |
| 870 | config := NewFunctionalTestConfig() |
| 871 | setupFunctionalTest(t) |
| 872 | defer teardownFunctionalTest(t) |
| 873 | |
| 874 | config.Producer.Return.Successes = true |
| 875 | config.Consumer.Return.Errors = true |
| 876 | config.Producer.Interceptors = []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 100}} |
| 877 | config.Consumer.Interceptors = []ConsumerInterceptor{&appendInterceptor{i: 20}} |
| 878 | |
| 879 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 880 | if err != nil { |
| 881 | t.Fatal(err) |
| 882 | } |
| 883 | defer safeClose(t, client) |
| 884 | |
| 885 | initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) |
| 886 | if err != nil { |
| 887 | t.Fatal(err) |
| 888 | } |
| 889 | |
| 890 | producer, err := NewAsyncProducerFromClient(client) |
| 891 | if err != nil { |
| 892 | t.Fatal(err) |
| 893 | } |
| 894 | |
| 895 | for i := 0; i < 10; i++ { |
| 896 | producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} |
| 897 | } |
| 898 | |
| 899 | for i := 0; i < 10; i++ { |
| 900 | select { |
| 901 | case msg := <-producer.Errors(): |
| 902 | t.Error(msg.Err) |
| 903 | case msg := <-producer.Successes(): |
| 904 | v, _ := msg.Value.Encode() |
| 905 | expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100) |
| 906 | if string(v) != expected { |
| 907 | t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) |
| 908 | } |
| 909 | } |
| 910 | } |
| 911 | safeClose(t, producer) |
| 912 | |
| 913 | master, err := NewConsumerFromClient(client) |
| 914 | if err != nil { |
| 915 | t.Fatal(err) |
| 916 | } |
| 917 | consumer, err := master.ConsumePartition("test.1", 0, initialOffset) |
| 918 | if err != nil { |
| 919 | t.Fatal(err) |
| 920 | } |
| 921 | |
| 922 | for i := 0; i < 10; i++ { |
| 923 | select { |
| 924 | case <-time.After(10 * time.Second): |
| 925 | t.Fatal("Not received any more events in the last 10 seconds.") |
| 926 | case err := <-consumer.Errors(): |
nothing calls this directly
no test coverage detected