MCPcopy
hub / github.com/IBM/sarama / TestSyncProducerBatch

Function TestSyncProducerBatch

sync_producer_test.go:165–211  ·  sync_producer_test.go::TestSyncProducerBatch
(t *testing.T)

Source from the content-addressed store, hash-verified

163}
164
165func TestSyncProducerBatch(t *testing.T) {
166 seedBroker := NewMockBroker(t, 1)
167 leader := NewMockBroker(t, 2)
168
169 metadataResponse := new(MetadataResponse)
170 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
171 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
172 seedBroker.Returns(metadataResponse)
173
174 prodSuccess := new(ProduceResponse)
175 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
176 leader.Returns(prodSuccess)
177
178 config := NewTestConfig()
179 config.Producer.Flush.Messages = 3
180 config.Producer.Return.Successes = true
181 producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
182 if err != nil {
183 t.Fatal(err)
184 }
185
186 err = producer.SendMessages([]*ProducerMessage{
187 {
188 Topic: "my_topic",
189 Value: StringEncoder(TestMessage),
190 Metadata: "test",
191 },
192 {
193 Topic: "my_topic",
194 Value: StringEncoder(TestMessage),
195 Metadata: "test",
196 },
197 {
198 Topic: "my_topic",
199 Value: StringEncoder(TestMessage),
200 Metadata: "test",
201 },
202 })
203
204 if err != nil {
205 t.Error(err)
206 }
207
208 safeClose(t, producer)
209 leader.Close()
210 seedBroker.Close()
211}
212
213func TestConcurrentSyncProducer(t *testing.T) {
214 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 14

AddrMethod · 0.95
BrokerIDMethod · 0.95
ReturnsMethod · 0.95
SendMessagesMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
NewTestConfigFunction · 0.70
NewSyncProducerFunction · 0.70
safeCloseFunction · 0.70

Tested by

no test coverage detected