( t *testing.T, version KafkaVersion, txnID, groupID string, addOffsetToTxn func(producer AsyncProducer, sess ConsumerGroupSession, msg *ConsumerMessage, groupID string) error, )
| 415 | } |
| 416 | |
| 417 | func testTxnProduceAndCommitOffset( |
| 418 | t *testing.T, |
| 419 | version KafkaVersion, |
| 420 | txnID, groupID string, |
| 421 | addOffsetToTxn func(producer AsyncProducer, sess ConsumerGroupSession, msg *ConsumerMessage, groupID string) error, |
| 422 | ) { |
| 423 | setupFunctionalTest(t) |
| 424 | defer teardownFunctionalTest(t) |
| 425 | |
| 426 | config := NewFunctionalTestConfig() |
| 427 | config.Version = version |
| 428 | config.ChannelBufferSize = 20 |
| 429 | config.Producer.Flush.Frequency = 50 * time.Millisecond |
| 430 | config.Producer.Flush.Messages = 200 |
| 431 | config.Producer.Idempotent = true |
| 432 | config.Producer.Transaction.ID = txnID |
| 433 | config.Producer.RequiredAcks = WaitForAll |
| 434 | config.Producer.Transaction.Retry.Max = 200 |
| 435 | config.Consumer.IsolationLevel = ReadCommitted |
| 436 | config.Consumer.Offsets.AutoCommit.Enable = false |
| 437 | config.Net.MaxOpenRequests = 1 |
| 438 | |
| 439 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 440 | require.NoError(t, err) |
| 441 | defer client.Close() |
| 442 | |
| 443 | admin, err := NewClusterAdminFromClient(client) |
| 444 | require.NoError(t, err) |
| 445 | defer admin.Close() |
| 446 | |
| 447 | producer, err := NewAsyncProducerFromClient(client) |
| 448 | require.NoError(t, err) |
| 449 | defer producer.Close() |
| 450 | |
| 451 | cg, err := NewConsumerGroupFromClient(groupID, client) |
| 452 | require.NoError(t, err) |
| 453 | defer cg.Close() |
| 454 | |
| 455 | ctx, cancel := context.WithCancel(context.Background()) |
| 456 | defer cancel() |
| 457 | |
| 458 | handler := &messageHandler{started: make(chan struct{})} |
| 459 | handler.T = t |
| 460 | handler.h = func(sess ConsumerGroupSession, msg *ConsumerMessage) { |
| 461 | require.NoError(t, producer.BeginTxn()) |
| 462 | producer.Input() <- &ProducerMessage{Topic: "test.1", Value: StringEncoder("test-prod")} |
| 463 | require.NoError(t, addOffsetToTxn(producer, sess, msg, groupID)) |
| 464 | require.NoError(t, producer.CommitTxn()) |
| 465 | } |
| 466 | |
| 467 | go func() { |
| 468 | err = cg.Consume(ctx, []string{"test.4"}, handler) |
| 469 | assert.NoError(t, err) |
| 470 | }() |
| 471 | |
| 472 | <-handler.started |
| 473 | |
| 474 | nonTransactionalProducer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) |
no test coverage detected