(t *testing.T)
| 211 | } |
| 212 | |
| 213 | func TestConcurrentSyncProducer(t *testing.T) { |
| 214 | seedBroker := NewMockBroker(t, 1) |
| 215 | leader := NewMockBroker(t, 2) |
| 216 | |
| 217 | metadataResponse := new(MetadataResponse) |
| 218 | metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) |
| 219 | metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 220 | seedBroker.Returns(metadataResponse) |
| 221 | |
| 222 | prodSuccess := new(ProduceResponse) |
| 223 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 224 | leader.Returns(prodSuccess) |
| 225 | |
| 226 | config := NewTestConfig() |
| 227 | config.Producer.Flush.Messages = 100 |
| 228 | config.Producer.Return.Successes = true |
| 229 | producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) |
| 230 | if err != nil { |
| 231 | t.Fatal(err) |
| 232 | } |
| 233 | |
| 234 | wg := sync.WaitGroup{} |
| 235 | |
| 236 | for range 100 { |
| 237 | wg.Go(func() { |
| 238 | msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)} |
| 239 | partition, _, err := producer.SendMessage(msg) |
| 240 | if partition != 0 { |
| 241 | t.Error("Unexpected partition") |
| 242 | } |
| 243 | if err != nil { |
| 244 | t.Error(err) |
| 245 | } |
| 246 | }) |
| 247 | } |
| 248 | wg.Wait() |
| 249 | |
| 250 | safeClose(t, producer) |
| 251 | leader.Close() |
| 252 | seedBroker.Close() |
| 253 | } |
| 254 | |
| 255 | func TestSyncProducerToNonExistingTopic(t *testing.T) { |
| 256 | broker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected