MCPcopy
hub / github.com/segmentio/kafka-go / TestClientInitProducerId

Function TestClientInitProducerId

initproducerid_test.go:14–100  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

12)
13
14func TestClientInitProducerId(t *testing.T) {
15 if !ktesting.KafkaIsAtLeast("0.11.0") {
16 return
17 }
18
19 // TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
20 // work is revisited.
21 if ktesting.KafkaIsAtLeast("3.0.0") {
22 t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
23 }
24
25 client, shutdown := newLocalClient()
26 defer shutdown()
27
28 tid := makeTransactionalID()
29 // Wait for kafka setup and Coordinator to be available.
30 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
31 defer cancel()
32 respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
33 Addr: client.Addr,
34 Key: tid,
35 KeyType: CoordinatorKeyTypeTransaction,
36 })
37 if err != nil {
38 t.Fatal(err)
39 }
40
41 // Now establish a connection with the transaction coordinator
42 transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
43 client, shutdown = newClient(transactionCoordinator)
44 defer shutdown()
45
46 // Check if producer epoch increases and PID remains the same when producer is
47 // initialized again with the same transactionalID
48 resp, err := client.InitProducerID(context.Background(), &InitProducerIDRequest{
49 Addr: transactionCoordinator,
50 TransactionalID: tid,
51 TransactionTimeoutMs: 30000,
52 })
53 if err != nil {
54 t.Fatal(err)
55 }
56
57 if resp.Error != nil {
58 t.Fatal(resp.Error)
59 }
60
61 epoch1 := resp.Producer.ProducerEpoch
62 pid1 := resp.Producer.ProducerID
63
64 resp, err = client.InitProducerID(context.Background(), &InitProducerIDRequest{
65 Addr: transactionCoordinator,
66 TransactionalID: tid,
67 TransactionTimeoutMs: 30000,
68 ProducerID: pid1,
69 ProducerEpoch: epoch1,
70 })
71 if err != nil {

Callers

nothing calls this directly

Calls 6

makeTransactionalIDFunction · 0.85
TCPFunction · 0.85
InitProducerIDMethod · 0.80
newLocalClientFunction · 0.70
newClientFunction · 0.70

Tested by

no test coverage detected