(t *testing.T)
| 220 | } |
| 221 | |
| 222 | func TestAsyncProducerMultipleBrokers(t *testing.T) { |
| 223 | seedBroker := NewMockBroker(t, 1) |
| 224 | leader0 := NewMockBroker(t, 2) |
| 225 | leader1 := NewMockBroker(t, 3) |
| 226 | |
| 227 | metadataResponse := new(MetadataResponse) |
| 228 | metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID()) |
| 229 | metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID()) |
| 230 | metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError) |
| 231 | metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError) |
| 232 | seedBroker.Returns(metadataResponse) |
| 233 | |
| 234 | prodResponse0 := new(ProduceResponse) |
| 235 | prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError) |
| 236 | leader0.Returns(prodResponse0) |
| 237 | |
| 238 | prodResponse1 := new(ProduceResponse) |
| 239 | prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError) |
| 240 | leader1.Returns(prodResponse1) |
| 241 | |
| 242 | config := NewTestConfig() |
| 243 | config.Producer.Flush.Messages = 5 |
| 244 | config.Producer.Return.Successes = true |
| 245 | config.Producer.Partitioner = NewRoundRobinPartitioner |
| 246 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 247 | if err != nil { |
| 248 | t.Fatal(err) |
| 249 | } |
| 250 | |
| 251 | for range 10 { |
| 252 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 253 | } |
| 254 | expectResults(t, producer, 10, 0) |
| 255 | |
| 256 | closeProducer(t, producer) |
| 257 | leader1.Close() |
| 258 | leader0.Close() |
| 259 | seedBroker.Close() |
| 260 | } |
| 261 | |
| 262 | func TestAsyncProducerCustomPartitioner(t *testing.T) { |
| 263 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected