(t *testing.T)
| 86 | } |
| 87 | |
| 88 | func TestTxnmgrInitProducerIdTxn(t *testing.T) { |
| 89 | broker := NewMockBroker(t, 1) |
| 90 | defer broker.Close() |
| 91 | |
| 92 | metadataLeader := new(MetadataResponse) |
| 93 | metadataLeader.Version = 4 |
| 94 | metadataLeader.ControllerID = broker.brokerID |
| 95 | metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) |
| 96 | broker.Returns(metadataLeader) |
| 97 | |
| 98 | config := NewTestConfig() |
| 99 | config.Producer.Idempotent = true |
| 100 | config.Producer.Transaction.ID = "test" |
| 101 | config.Version = V0_11_0_0 |
| 102 | config.Producer.RequiredAcks = WaitForAll |
| 103 | config.Net.MaxOpenRequests = 1 |
| 104 | |
| 105 | client, err := NewClient([]string{broker.Addr()}, config) |
| 106 | require.NoError(t, err) |
| 107 | defer client.Close() |
| 108 | |
| 109 | findCoordinatorResponse := FindCoordinatorResponse{ |
| 110 | Coordinator: client.Brokers()[0], |
| 111 | Err: ErrNoError, |
| 112 | Version: 1, |
| 113 | } |
| 114 | broker.Returns(&findCoordinatorResponse) |
| 115 | |
| 116 | producerIdResponse := &InitProducerIDResponse{ |
| 117 | Err: ErrNoError, |
| 118 | ProducerID: 1, |
| 119 | ProducerEpoch: 0, |
| 120 | } |
| 121 | broker.Returns(producerIdResponse) |
| 122 | |
| 123 | txmng, err := newTransactionManager(config, client) |
| 124 | require.NoError(t, err) |
| 125 | |
| 126 | require.Equal(t, int64(1), txmng.producerID) |
| 127 | require.Equal(t, int16(0), txmng.producerEpoch) |
| 128 | require.Equal(t, ProducerTxnFlagReady, txmng.status) |
| 129 | } |
| 130 | |
| 131 | // TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress |
| 132 | func TestTxnmgrInitProducerIdTxnCoordinatorLoading(t *testing.T) { |
nothing calls this directly
no test coverage detected