(t *testing.T)
| 383 | } |
| 384 | |
| 385 | func TestFuncTxnProduceAndCommitOffset(t *testing.T) { |
| 386 | checkKafkaVersion(t, "0.11.0.0") |
| 387 | |
| 388 | t.Run("legacy group ID", func(t *testing.T) { |
| 389 | testTxnProduceAndCommitOffset(t, MaxVersion, "TestFuncTxnProduceAndCommitOffset", "test-produce", |
| 390 | func(producer AsyncProducer, _ ConsumerGroupSession, msg *ConsumerMessage, groupID string) error { |
| 391 | return producer.AddMessageToTxn(msg, groupID, nil) |
| 392 | }) |
| 393 | }) |
| 394 | |
| 395 | // test the v2/v3 TxnOffsetCommit protocol boundaries across broker versions |
| 396 | versions := []struct { |
| 397 | name string |
| 398 | version KafkaVersion |
| 399 | minKafka string |
| 400 | }{ |
| 401 | {"v2", V2_1_0_0, "2.1.0"}, |
| 402 | {"v3 with group metadata", V2_5_0_0, "2.5.0"}, |
| 403 | } |
| 404 | for _, v := range versions { |
| 405 | t.Run(v.name, func(t *testing.T) { |
| 406 | checkKafkaVersion(t, v.minKafka) |
| 407 | groupID := "test-produce-" + v.version.String() |
| 408 | txnID := "TestFuncTxnProduceAndCommitOffset-" + v.version.String() |
| 409 | testTxnProduceAndCommitOffset(t, v.version, txnID, groupID, |
| 410 | func(producer AsyncProducer, sess ConsumerGroupSession, msg *ConsumerMessage, groupID string) error { |
| 411 | return producer.AddMessageToTxnWithGroupMetadata(msg, NewConsumerGroupMetadataFromSession(sess, groupID, nil), nil) |
| 412 | }) |
| 413 | }) |
| 414 | } |
| 415 | } |
| 416 | |
| 417 | func testTxnProduceAndCommitOffset( |
| 418 | t *testing.T, |
nothing calls this directly
no test coverage detected