(t *testing.T)
| 185 | } |
| 186 | |
| 187 | func TestAsyncProducerMultipleFlushes(t *testing.T) { |
| 188 | seedBroker := NewMockBroker(t, 1) |
| 189 | leader := NewMockBroker(t, 2) |
| 190 | |
| 191 | metadataResponse := new(MetadataResponse) |
| 192 | metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) |
| 193 | metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 194 | seedBroker.Returns(metadataResponse) |
| 195 | |
| 196 | prodSuccess := new(ProduceResponse) |
| 197 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 198 | leader.Returns(prodSuccess) |
| 199 | leader.Returns(prodSuccess) |
| 200 | leader.Returns(prodSuccess) |
| 201 | |
| 202 | config := NewTestConfig() |
| 203 | config.Producer.Flush.Messages = 5 |
| 204 | config.Producer.Return.Successes = true |
| 205 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 206 | if err != nil { |
| 207 | t.Fatal(err) |
| 208 | } |
| 209 | |
| 210 | for range 3 { |
| 211 | for range 5 { |
| 212 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 213 | } |
| 214 | expectResults(t, producer, 5, 0) |
| 215 | } |
| 216 | |
| 217 | closeProducer(t, producer) |
| 218 | leader.Close() |
| 219 | seedBroker.Close() |
| 220 | } |
| 221 | |
| 222 | func TestAsyncProducerMultipleBrokers(t *testing.T) { |
| 223 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected