MCPcopy
hub / github.com/IBM/sarama / TestFuncTxnProduceAndCommitOffset

Function TestFuncTxnProduceAndCommitOffset

functional_producer_test.go:385–415  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

383}
384
385func TestFuncTxnProduceAndCommitOffset(t *testing.T) {
386 checkKafkaVersion(t, "0.11.0.0")
387
388 t.Run("legacy group ID", func(t *testing.T) {
389 testTxnProduceAndCommitOffset(t, MaxVersion, "TestFuncTxnProduceAndCommitOffset", "test-produce",
390 func(producer AsyncProducer, _ ConsumerGroupSession, msg *ConsumerMessage, groupID string) error {
391 return producer.AddMessageToTxn(msg, groupID, nil)
392 })
393 })
394
395 // test the v2/v3 TxnOffsetCommit protocol boundaries across broker versions
396 versions := []struct {
397 name string
398 version KafkaVersion
399 minKafka string
400 }{
401 {"v2", V2_1_0_0, "2.1.0"},
402 {"v3 with group metadata", V2_5_0_0, "2.5.0"},
403 }
404 for _, v := range versions {
405 t.Run(v.name, func(t *testing.T) {
406 checkKafkaVersion(t, v.minKafka)
407 groupID := "test-produce-" + v.version.String()
408 txnID := "TestFuncTxnProduceAndCommitOffset-" + v.version.String()
409 testTxnProduceAndCommitOffset(t, v.version, txnID, groupID,
410 func(producer AsyncProducer, sess ConsumerGroupSession, msg *ConsumerMessage, groupID string) error {
411 return producer.AddMessageToTxnWithGroupMetadata(msg, NewConsumerGroupMetadataFromSession(sess, groupID, nil), nil)
412 })
413 })
414 }
415}
416
417func testTxnProduceAndCommitOffset(
418 t *testing.T,

Callers

nothing calls this directly

Calls 7

checkKafkaVersionFunction · 0.85
RunMethod · 0.80
AddMessageToTxnMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected