MCPcopy
hub / github.com/IBM/sarama / testTxnProduceAndCommitOffset

Function testTxnProduceAndCommitOffset

functional_producer_test.go:417–514  ·  view source on GitHub ↗
(
	t *testing.T,
	version KafkaVersion,
	txnID, groupID string,
	addOffsetToTxn func(producer AsyncProducer, sess ConsumerGroupSession, msg *ConsumerMessage, groupID string) error,
)

Source from the content-addressed store, hash-verified

415}
416
417func testTxnProduceAndCommitOffset(
418 t *testing.T,
419 version KafkaVersion,
420 txnID, groupID string,
421 addOffsetToTxn func(producer AsyncProducer, sess ConsumerGroupSession, msg *ConsumerMessage, groupID string) error,
422) {
423 setupFunctionalTest(t)
424 defer teardownFunctionalTest(t)
425
426 config := NewFunctionalTestConfig()
427 config.Version = version
428 config.ChannelBufferSize = 20
429 config.Producer.Flush.Frequency = 50 * time.Millisecond
430 config.Producer.Flush.Messages = 200
431 config.Producer.Idempotent = true
432 config.Producer.Transaction.ID = txnID
433 config.Producer.RequiredAcks = WaitForAll
434 config.Producer.Transaction.Retry.Max = 200
435 config.Consumer.IsolationLevel = ReadCommitted
436 config.Consumer.Offsets.AutoCommit.Enable = false
437 config.Net.MaxOpenRequests = 1
438
439 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
440 require.NoError(t, err)
441 defer client.Close()
442
443 admin, err := NewClusterAdminFromClient(client)
444 require.NoError(t, err)
445 defer admin.Close()
446
447 producer, err := NewAsyncProducerFromClient(client)
448 require.NoError(t, err)
449 defer producer.Close()
450
451 cg, err := NewConsumerGroupFromClient(groupID, client)
452 require.NoError(t, err)
453 defer cg.Close()
454
455 ctx, cancel := context.WithCancel(context.Background())
456 defer cancel()
457
458 handler := &messageHandler{started: make(chan struct{})}
459 handler.T = t
460 handler.h = func(sess ConsumerGroupSession, msg *ConsumerMessage) {
461 require.NoError(t, producer.BeginTxn())
462 producer.Input() <- &ProducerMessage{Topic: "test.1", Value: StringEncoder("test-prod")}
463 require.NoError(t, addOffsetToTxn(producer, sess, msg, groupID))
464 require.NoError(t, producer.CommitTxn())
465 }
466
467 go func() {
468 err = cg.Consume(ctx, []string{"test.4"}, handler)
469 assert.NoError(t, err)
470 }()
471
472 <-handler.started
473
474 nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig())

Callers 1

Calls 15

CloseMethod · 0.95
InputMethod · 0.95
CloseMethod · 0.95
RefreshMetadataMethod · 0.95
GetOffsetMethod · 0.95
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85

Tested by

no test coverage detected