MCPcopy
hub / github.com/IBM/sarama / TestConcurrentSyncProducer

Function TestConcurrentSyncProducer

sync_producer_test.go:213–253  ·  sync_producer_test.go::TestConcurrentSyncProducer
(t *testing.T)

Source from the content-addressed store, hash-verified

211}
212
213func TestConcurrentSyncProducer(t *testing.T) {
214 seedBroker := NewMockBroker(t, 1)
215 leader := NewMockBroker(t, 2)
216
217 metadataResponse := new(MetadataResponse)
218 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
219 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
220 seedBroker.Returns(metadataResponse)
221
222 prodSuccess := new(ProduceResponse)
223 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
224 leader.Returns(prodSuccess)
225
226 config := NewTestConfig()
227 config.Producer.Flush.Messages = 100
228 config.Producer.Return.Successes = true
229 producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
230 if err != nil {
231 t.Fatal(err)
232 }
233
234 wg := sync.WaitGroup{}
235
236 for range 100 {
237 wg.Go(func() {
238 msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder(TestMessage)}
239 partition, _, err := producer.SendMessage(msg)
240 if partition != 0 {
241 t.Error("Unexpected partition")
242 }
243 if err != nil {
244 t.Error(err)
245 }
246 })
247 }
248 wg.Wait()
249
250 safeClose(t, producer)
251 leader.Close()
252 seedBroker.Close()
253}
254
255func TestSyncProducerToNonExistingTopic(t *testing.T) {
256 broker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 14

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
SendMessageMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
NewTestConfigFunction · 0.70
NewSyncProducerFunction · 0.70
safeCloseFunction · 0.70

Tested by

no test coverage detected