( t *testing.T, interceptors []ProducerInterceptor, expectationFn func(*testing.T, int, *ProducerMessage), )
| 2119 | func (c *stubLeaderClient) Closed() bool { return false } |
| 2120 | |
| 2121 | func testProducerInterceptor( |
| 2122 | t *testing.T, |
| 2123 | interceptors []ProducerInterceptor, |
| 2124 | expectationFn func(*testing.T, int, *ProducerMessage), |
| 2125 | ) { |
| 2126 | seedBroker := NewMockBroker(t, 1) |
| 2127 | leader := NewMockBroker(t, 2) |
| 2128 | metadataLeader := new(MetadataResponse) |
| 2129 | metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) |
| 2130 | metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 2131 | seedBroker.Returns(metadataLeader) |
| 2132 | |
| 2133 | config := NewTestConfig() |
| 2134 | config.Producer.Flush.Messages = 10 |
| 2135 | config.Producer.Return.Successes = true |
| 2136 | config.Producer.Interceptors = interceptors |
| 2137 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 2138 | if err != nil { |
| 2139 | t.Fatal(err) |
| 2140 | } |
| 2141 | |
| 2142 | for range 10 { |
| 2143 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 2144 | } |
| 2145 | |
| 2146 | prodSuccess := new(ProduceResponse) |
| 2147 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 2148 | leader.Returns(prodSuccess) |
| 2149 | |
| 2150 | for i := range 10 { |
| 2151 | select { |
| 2152 | case msg := <-producer.Errors(): |
| 2153 | t.Error(msg.Err) |
| 2154 | case msg := <-producer.Successes(): |
| 2155 | expectationFn(t, i, msg) |
| 2156 | } |
| 2157 | } |
| 2158 | |
| 2159 | closeProducer(t, producer) |
| 2160 | leader.Close() |
| 2161 | seedBroker.Close() |
| 2162 | } |
| 2163 | |
| 2164 | func TestAsyncProducerInterceptors(t *testing.T) { |
| 2165 | tests := []struct { |
no test coverage detected