MCPcopy
hub / github.com/segmentio/kafka-go / TestClientAddPartitionsToTxn

Function TestClientAddPartitionsToTxn

addpartitionstotxn_test.go:13–133  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

11)
12
13func TestClientAddPartitionsToTxn(t *testing.T) {
14 if !ktesting.KafkaIsAtLeast("0.11.0") {
15 t.Skip("Skipping test because kafka version is not high enough.")
16 }
17
18 // TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
19 // work is revisited.
20 if ktesting.KafkaIsAtLeast("3.0.0") {
21 t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
22 }
23
24 topic1 := makeTopic()
25 topic2 := makeTopic()
26
27 client, shutdown := newLocalClient()
28 defer shutdown()
29
30 err := clientCreateTopic(client, topic1, 3)
31 if err != nil {
32 t.Fatal(err)
33 }
34
35 err = clientCreateTopic(client, topic2, 3)
36 if err != nil {
37 t.Fatal(err)
38 }
39
40 transactionalID := makeTransactionalID()
41
42 ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
43 defer cancel()
44 respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
45 Addr: client.Addr,
46 Key: transactionalID,
47 KeyType: CoordinatorKeyTypeTransaction,
48 })
49 if err != nil {
50 t.Fatal(err)
51 }
52
53 transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
54 client, shutdown = newClient(transactionCoordinator)
55 defer shutdown()
56
57 ipResp, err := client.InitProducerID(ctx, &InitProducerIDRequest{
58 TransactionalID: transactionalID,
59 TransactionTimeoutMs: 10000,
60 })
61 if err != nil {
62 t.Fatal(err)
63 }
64
65 if ipResp.Error != nil {
66 t.Fatal(ipResp.Error)
67 }
68
69 defer func() {
70 err := clientEndTxn(client, &EndTxnRequest{

Callers

nothing calls this directly

Calls 11

makeTransactionalIDFunction · 0.85
TCPFunction · 0.85
clientEndTxnFunction · 0.85
InitProducerIDMethod · 0.80
AddPartitionsToTxnMethod · 0.80
makeTopicFunction · 0.70
newLocalClientFunction · 0.70
clientCreateTopicFunction · 0.70
newClientFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected