MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerCustomPartitioner

Function TestAsyncProducerCustomPartitioner

async_producer_test.go:262–302  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

260}
261
262func TestAsyncProducerCustomPartitioner(t *testing.T) {
263 seedBroker := NewMockBroker(t, 1)
264 leader := NewMockBroker(t, 2)
265
266 metadataResponse := new(MetadataResponse)
267 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
268 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
269 seedBroker.Returns(metadataResponse)
270
271 prodResponse := new(ProduceResponse)
272 prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
273 leader.Returns(prodResponse)
274
275 config := NewTestConfig()
276 config.Producer.Flush.Messages = 2
277 config.Producer.Return.Successes = true
278 config.Producer.Partitioner = func(topic string) Partitioner {
279 p := make(testPartitioner)
280 go func() {
281 p.feed(0)
282 p <- nil
283 p <- nil
284 p <- nil
285 p.feed(0)
286 }()
287 return p
288 }
289 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
290 if err != nil {
291 t.Fatal(err)
292 }
293
294 for range 5 {
295 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
296 }
297 expectResults(t, producer, 2, 3)
298
299 closeProducer(t, producer)
300 leader.Close()
301 seedBroker.Close()
302}
303
304func TestAsyncProducerFailureRetry(t *testing.T) {
305 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 15

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsFunction · 0.85
closeProducerFunction · 0.85
AddBrokerMethod · 0.80
feedMethod · 0.80
FatalMethod · 0.80

Tested by

no test coverage detected