MCPcopy
hub / github.com/IBM/sarama / TestInterceptors

Function TestInterceptors

functional_producer_test.go:869–939  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

867}
868
869func TestInterceptors(t *testing.T) {
870 config := NewFunctionalTestConfig()
871 setupFunctionalTest(t)
872 defer teardownFunctionalTest(t)
873
874 config.Producer.Return.Successes = true
875 config.Consumer.Return.Errors = true
876 config.Producer.Interceptors = []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 100}}
877 config.Consumer.Interceptors = []ConsumerInterceptor{&appendInterceptor{i: 20}}
878
879 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
880 if err != nil {
881 t.Fatal(err)
882 }
883 defer safeClose(t, client)
884
885 initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest)
886 if err != nil {
887 t.Fatal(err)
888 }
889
890 producer, err := NewAsyncProducerFromClient(client)
891 if err != nil {
892 t.Fatal(err)
893 }
894
895 for i := 0; i < 10; i++ {
896 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
897 }
898
899 for i := 0; i < 10; i++ {
900 select {
901 case msg := <-producer.Errors():
902 t.Error(msg.Err)
903 case msg := <-producer.Successes():
904 v, _ := msg.Value.Encode()
905 expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100)
906 if string(v) != expected {
907 t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
908 }
909 }
910 }
911 safeClose(t, producer)
912
913 master, err := NewConsumerFromClient(client)
914 if err != nil {
915 t.Fatal(err)
916 }
917 consumer, err := master.ConsumePartition("test.1", 0, initialOffset)
918 if err != nil {
919 t.Fatal(err)
920 }
921
922 for i := 0; i < 10; i++ {
923 select {
924 case <-time.After(10 * time.Second):
925 t.Fatal("Not received any more events in the last 10 seconds.")
926 case err := <-consumer.Errors():

Callers

nothing calls this directly

Calls 15

GetOffsetMethod · 0.95
NewFunctionalTestConfigFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
StringEncoderTypeAlias · 0.85
NewConsumerFromClientFunction · 0.85
FatalMethod · 0.80
NewClientFunction · 0.70
safeCloseFunction · 0.70
InputMethod · 0.65
ErrorsMethod · 0.65

Tested by

no test coverage detected