TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress
(t *testing.T)
| 130 | |
| 131 | // TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress |
| 132 | func TestTxnmgrInitProducerIdTxnCoordinatorLoading(t *testing.T) { |
| 133 | config := NewTestConfig() |
| 134 | config.Producer.Idempotent = true |
| 135 | config.Producer.Transaction.ID = "txid-group" |
| 136 | config.Version = V0_11_0_0 |
| 137 | config.Producer.RequiredAcks = WaitForAll |
| 138 | config.Net.MaxOpenRequests = 1 |
| 139 | |
| 140 | broker := NewMockBroker(t, 1) |
| 141 | defer broker.Close() |
| 142 | |
| 143 | broker.SetHandlerByMap(map[string]MockResponse{ |
| 144 | "MetadataRequest": NewMockMetadataResponse(t). |
| 145 | SetController(broker.BrokerID()). |
| 146 | SetBroker(broker.Addr(), broker.BrokerID()), |
| 147 | "FindCoordinatorRequest": NewMockSequence( |
| 148 | NewMockFindCoordinatorResponse(t). |
| 149 | SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress), |
| 150 | NewMockFindCoordinatorResponse(t). |
| 151 | SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress), |
| 152 | NewMockFindCoordinatorResponse(t). |
| 153 | SetCoordinator(CoordinatorTransaction, "txid-group", broker), |
| 154 | ), |
| 155 | "InitProducerIDRequest": NewMockSequence( |
| 156 | NewMockInitProducerIDResponse(t). |
| 157 | SetError(ErrOffsetsLoadInProgress), |
| 158 | NewMockInitProducerIDResponse(t). |
| 159 | SetError(ErrOffsetsLoadInProgress), |
| 160 | NewMockInitProducerIDResponse(t). |
| 161 | SetProducerID(1). |
| 162 | SetProducerEpoch(0), |
| 163 | ), |
| 164 | }) |
| 165 | |
| 166 | client, err := NewClient([]string{broker.Addr()}, config) |
| 167 | require.NoError(t, err) |
| 168 | defer client.Close() |
| 169 | |
| 170 | txmng, err := newTransactionManager(config, client) |
| 171 | require.NoError(t, err) |
| 172 | |
| 173 | require.Equal(t, int64(1), txmng.producerID) |
| 174 | require.Equal(t, int16(0), txmng.producerEpoch) |
| 175 | require.Equal(t, ProducerTxnFlagReady, txmng.status) |
| 176 | } |
| 177 | |
| 178 | func TestMaybeAddPartitionToCurrentTxn(t *testing.T) { |
| 179 | type testCase struct { |
nothing calls this directly
no test coverage detected