test case for https://github.com/IBM/sarama/pull/2378
(t *testing.T)
| 1379 | |
| 1380 | // test case for https://github.com/IBM/sarama/pull/2378 |
| 1381 | func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) { |
| 1382 | broker := NewMockBroker(t, 1) |
| 1383 | |
| 1384 | metadataResponse := &MetadataResponse{ |
| 1385 | Version: 4, |
| 1386 | ControllerID: 1, |
| 1387 | } |
| 1388 | metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) |
| 1389 | metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 1390 | |
| 1391 | initProducerIDResponse := &InitProducerIDResponse{ |
| 1392 | ThrottleTime: 0, |
| 1393 | ProducerID: 1000, |
| 1394 | ProducerEpoch: 1, |
| 1395 | } |
| 1396 | |
| 1397 | prodNotLeaderResponse := &ProduceResponse{ |
| 1398 | Version: 3, |
| 1399 | ThrottleTime: 0, |
| 1400 | } |
| 1401 | prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas) |
| 1402 | |
| 1403 | handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) { |
| 1404 | switch req.body.key() { |
| 1405 | case 3: |
| 1406 | return metadataResponse |
| 1407 | case 22: |
| 1408 | return initProducerIDResponse |
| 1409 | case 0: // for msg, always return error to trigger retryBatch |
| 1410 | return prodNotLeaderResponse |
| 1411 | } |
| 1412 | return nil |
| 1413 | } |
| 1414 | |
| 1415 | config := NewTestConfig() |
| 1416 | config.Version = V0_11_0_0 |
| 1417 | config.Producer.Idempotent = true |
| 1418 | config.Net.MaxOpenRequests = 1 |
| 1419 | config.Producer.Retry.Max = 1 // set max retry to 1 |
| 1420 | config.Producer.RequiredAcks = WaitForAll |
| 1421 | config.Producer.Return.Successes = true |
| 1422 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 1423 | config.Producer.Retry.Backoff = 100 * time.Millisecond |
| 1424 | |
| 1425 | broker.setHandler(handlerFailBeforeWrite) |
| 1426 | producer, err := NewAsyncProducer([]string{broker.Addr()}, config) |
| 1427 | if err != nil { |
| 1428 | t.Fatal(err) |
| 1429 | } |
| 1430 | |
| 1431 | for range 3 { |
| 1432 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 1433 | } |
| 1434 | |
| 1435 | go func() { |
| 1436 | for range 7 { |
| 1437 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")} |
| 1438 | time.Sleep(100 * time.Millisecond) |
nothing calls this directly
no test coverage detected