(t *testing.T)
| 260 | } |
| 261 | |
| 262 | func TestAsyncProducerCustomPartitioner(t *testing.T) { |
| 263 | seedBroker := NewMockBroker(t, 1) |
| 264 | leader := NewMockBroker(t, 2) |
| 265 | |
| 266 | metadataResponse := new(MetadataResponse) |
| 267 | metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) |
| 268 | metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 269 | seedBroker.Returns(metadataResponse) |
| 270 | |
| 271 | prodResponse := new(ProduceResponse) |
| 272 | prodResponse.AddTopicPartition("my_topic", 0, ErrNoError) |
| 273 | leader.Returns(prodResponse) |
| 274 | |
| 275 | config := NewTestConfig() |
| 276 | config.Producer.Flush.Messages = 2 |
| 277 | config.Producer.Return.Successes = true |
| 278 | config.Producer.Partitioner = func(topic string) Partitioner { |
| 279 | p := make(testPartitioner) |
| 280 | go func() { |
| 281 | p.feed(0) |
| 282 | p <- nil |
| 283 | p <- nil |
| 284 | p <- nil |
| 285 | p.feed(0) |
| 286 | }() |
| 287 | return p |
| 288 | } |
| 289 | producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) |
| 290 | if err != nil { |
| 291 | t.Fatal(err) |
| 292 | } |
| 293 | |
| 294 | for range 5 { |
| 295 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 296 | } |
| 297 | expectResults(t, producer, 2, 3) |
| 298 | |
| 299 | closeProducer(t, producer) |
| 300 | leader.Close() |
| 301 | seedBroker.Close() |
| 302 | } |
| 303 | |
| 304 | func TestAsyncProducerFailureRetry(t *testing.T) { |
| 305 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected