MCPcopy
hub / github.com/IBM/sarama / TestFuncTxnProduceEpochBump

Function TestFuncTxnProduceEpochBump

functional_producer_test.go:274–340  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

272}
273
274func TestFuncTxnProduceEpochBump(t *testing.T) {
275 checkKafkaVersion(t, "2.6.0")
276 setupFunctionalTest(t)
277 defer teardownFunctionalTest(t)
278
279 config := NewFunctionalTestConfig()
280 config.ChannelBufferSize = 20
281 config.Producer.Flush.Frequency = 50 * time.Millisecond
282 config.Producer.Flush.Messages = 200
283 config.Producer.Idempotent = true
284 config.Producer.Transaction.ID = "TestFuncTxnProduceEpochBump"
285 config.Producer.RequiredAcks = WaitForAll
286 config.Producer.Transaction.Retry.Max = 200
287 config.Consumer.IsolationLevel = ReadCommitted
288 config.Net.MaxOpenRequests = 1
289
290 consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
291 require.NoError(t, err)
292 defer consumer.Close()
293
294 pc, err := consumer.ConsumePartition("test.1", 0, OffsetNewest)
295 require.NoError(t, err)
296 msgChannel := pc.Messages()
297 defer pc.Close()
298
299 nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig())
300 require.NoError(t, err)
301 defer nonTransactionalProducer.Close()
302
303 // Ensure consumer is started
304 nonTransactionalProducer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
305 <-msgChannel
306
307 producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
308 require.NoError(t, err)
309 defer producer.Close()
310
311 err = producer.BeginTxn()
312 require.NoError(t, err)
313
314 for i := 0; i < 1; i++ {
315 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
316 }
317
318 err = producer.CommitTxn()
319 require.NoError(t, err)
320
321 for i := 0; i < 1; i++ {
322 msg := <-msgChannel
323 t.Logf("Received %s from %s-%d at offset %d", msg.Value, msg.Topic, msg.Partition, msg.Offset)
324 }
325
326 err = producer.BeginTxn()
327 require.NoError(t, err)
328
329 for i := 0; i < 1; i++ {
330 producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder("test")}
331 }

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
InputMethod · 0.95
BeginTxnMethod · 0.95
CommitTxnMethod · 0.95
checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
NewConsumerFunction · 0.70

Tested by

no test coverage detected