MCPcopy
hub / github.com/IBM/sarama / TestFuncMultiPartitionProduce

Function TestFuncMultiPartitionProduce

functional_producer_test.go:63–94  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

61}
62
63func TestFuncMultiPartitionProduce(t *testing.T) {
64 setupFunctionalTest(t)
65 defer teardownFunctionalTest(t)
66
67 config := NewFunctionalTestConfig()
68 config.ChannelBufferSize = 20
69 config.Producer.Flush.Frequency = 50 * time.Millisecond
70 config.Producer.Flush.Messages = 200
71 config.Producer.Return.Successes = true
72 producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
73 if err != nil {
74 t.Fatal(err)
75 }
76
77 var wg sync.WaitGroup
78 wg.Add(TestBatchSize)
79
80 for i := 1; i <= TestBatchSize; i++ {
81 go func(i int) {
82 defer wg.Done()
83 msg := &ProducerMessage{Topic: "test.64", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
84 if _, _, err := producer.SendMessage(msg); err != nil {
85 t.Error(i, err)
86 }
87 }(i)
88 }
89
90 wg.Wait()
91 if err := producer.Close(); err != nil {
92 t.Error(err)
93 }
94}
95
96func TestFuncTxnProduceNoBegin(t *testing.T) {
97 checkKafkaVersion(t, "0.11.0.0")

Callers

nothing calls this directly

Calls 11

SendMessageMethod · 0.95
CloseMethod · 0.95
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
FatalMethod · 0.80
NewSyncProducerFunction · 0.70
DoneMethod · 0.65
ErrorMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected