MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerRetryShutdown

Function TestAsyncProducerRetryShutdown

async_producer_test.go:1096–1143  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1094}
1095
1096func TestAsyncProducerRetryShutdown(t *testing.T) {
1097 seedBroker := NewMockBroker(t, 1)
1098 leader := NewMockBroker(t, 2)
1099
1100 metadataLeader := new(MetadataResponse)
1101 metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
1102 metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
1103 seedBroker.Returns(metadataLeader)
1104
1105 config := NewTestConfig()
1106 config.Producer.Flush.Messages = 10
1107 config.Producer.Return.Successes = true
1108 config.Producer.Retry.Backoff = 0
1109 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
1110 if err != nil {
1111 t.Fatal(err)
1112 }
1113
1114 for range 10 {
1115 producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
1116 }
1117 producer.AsyncClose()
1118 time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
1119
1120 producer.Input() <- &ProducerMessage{Topic: "FOO"}
1121 if err := <-producer.Errors(); !errors.Is(err.Err, ErrShuttingDown) {
1122 t.Error(err)
1123 }
1124
1125 prodNotLeader := new(ProduceResponse)
1126 prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
1127 leader.Returns(prodNotLeader)
1128
1129 leader.Returns(metadataLeader)
1130
1131 prodSuccess := new(ProduceResponse)
1132 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
1133 leader.Returns(prodSuccess)
1134 expectResults(t, producer, 10, 0)
1135
1136 seedBroker.Close()
1137 leader.Close()
1138
1139 // wait for the async-closed producer to shut down fully
1140 for err := range producer.Errors() {
1141 t.Error(err)
1142 }
1143}
1144
1145func TestAsyncProducerNoReturns(t *testing.T) {
1146 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 15

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
InputMethod · 0.95
AsyncCloseMethod · 0.95
ErrorsMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
expectResultsFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80

Tested by

no test coverage detected