(t *testing.T)
| 1183 | } |
| 1184 | |
| 1185 | func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { |
| 1186 | broker := NewMockBroker(t, 1) |
| 1187 | |
| 1188 | metadataResponse := &MetadataResponse{ |
| 1189 | Version: 4, |
| 1190 | ControllerID: 1, |
| 1191 | } |
| 1192 | metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) |
| 1193 | metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 1194 | broker.Returns(metadataResponse) |
| 1195 | |
| 1196 | initProducerID := &InitProducerIDResponse{ |
| 1197 | ThrottleTime: 0, |
| 1198 | ProducerID: 1000, |
| 1199 | ProducerEpoch: 1, |
| 1200 | } |
| 1201 | broker.Returns(initProducerID) |
| 1202 | |
| 1203 | config := NewTestConfig() |
| 1204 | config.Producer.Flush.Messages = 10 |
| 1205 | config.Producer.Return.Successes = true |
| 1206 | config.Producer.Retry.Max = 4 |
| 1207 | config.Producer.RequiredAcks = WaitForAll |
| 1208 | config.Producer.Retry.Backoff = 0 |
| 1209 | config.Producer.Idempotent = true |
| 1210 | config.Net.MaxOpenRequests = 1 |
| 1211 | config.Version = V0_11_0_0 |
| 1212 | producer, err := NewAsyncProducer([]string{broker.Addr()}, config) |
| 1213 | if err != nil { |
| 1214 | t.Fatal(err) |
| 1215 | } |
| 1216 | |
| 1217 | for range 10 { |
| 1218 | producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} |
| 1219 | } |
| 1220 | |
| 1221 | prodSuccess := &ProduceResponse{ |
| 1222 | Version: 3, |
| 1223 | ThrottleTime: 0, |
| 1224 | } |
| 1225 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 1226 | broker.Returns(prodSuccess) |
| 1227 | expectResults(t, producer, 10, 0) |
| 1228 | |
| 1229 | broker.Close() |
| 1230 | closeProducer(t, producer) |
| 1231 | } |
| 1232 | |
| 1233 | func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { |
| 1234 | // Logger = log.New(os.Stderr, "", log.LstdFlags) |
nothing calls this directly
no test coverage detected