MCPcopy
hub / github.com/segmentio/kafka-go / TestClientTxnOffsetCommit

Function TestClientTxnOffsetCommit

txnoffsetcommit_test.go:14–239  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

12)
13
14func TestClientTxnOffsetCommit(t *testing.T) {
15 if !ktesting.KafkaIsAtLeast("0.11.0") {
16 t.Skip("Skipping test because kafka version is not high enough.")
17 }
18
19 // TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
20 // work is revisited.
21 if ktesting.KafkaIsAtLeast("3.0.0") {
22 t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
23 }
24
25 transactionalID := makeTransactionalID()
26 topic := makeTopic()
27
28 client, shutdown := newLocalClientWithTopic(topic, 1)
29 defer shutdown()
30 waitForTopic(context.TODO(), t, topic)
31 defer deleteTopic(t, topic)
32
33 now := time.Now()
34
35 const N = 10
36 records := make([]Record, 0, N)
37 for i := 0; i < N; i++ {
38 records = append(records, Record{
39 Time: now,
40 Value: NewBytes([]byte("test-message-" + strconv.Itoa(i))),
41 })
42 }
43 ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
44 defer cancel()
45 res, err := client.Produce(ctx, &ProduceRequest{
46 Topic: topic,
47 RequiredAcks: RequireAll,
48 Records: NewRecordReader(records...),
49 })
50 if err != nil {
51 t.Fatal(err)
52 }
53
54 if res.Error != nil {
55 t.Error(res.Error)
56 }
57
58 for index, err := range res.RecordErrors {
59 t.Fatalf("record at index %d produced an error: %v", index, err)
60 }
61
62 ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
63 defer cancel()
64 respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
65 Addr: client.Addr,
66 Key: transactionalID,
67 KeyType: CoordinatorKeyTypeTransaction,
68 })
69 if err != nil {
70 t.Fatal(err)
71 }

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
NextMethod · 0.95
makeTransactionalIDFunction · 0.85
waitForTopicFunction · 0.85
deleteTopicFunction · 0.85
makeGroupIDFunction · 0.85
NewConsumerGroupFunction · 0.85
clientEndTxnFunction · 0.85
ProduceMethod · 0.80
InitProducerIDMethod · 0.80
AddPartitionsToTxnMethod · 0.80

Tested by

no test coverage detected