MCPcopy
hub / github.com/IBM/sarama / TestTxnmgrInitProducerIdTxnCoordinatorLoading

Function TestTxnmgrInitProducerIdTxnCoordinatorLoading

transaction_manager_test.go:132–176  ·  view source on GitHub ↗

TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress

(t *testing.T)

Source from the content-addressed store, hash-verified

130
131// TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress
132func TestTxnmgrInitProducerIdTxnCoordinatorLoading(t *testing.T) {
133 config := NewTestConfig()
134 config.Producer.Idempotent = true
135 config.Producer.Transaction.ID = "txid-group"
136 config.Version = V0_11_0_0
137 config.Producer.RequiredAcks = WaitForAll
138 config.Net.MaxOpenRequests = 1
139
140 broker := NewMockBroker(t, 1)
141 defer broker.Close()
142
143 broker.SetHandlerByMap(map[string]MockResponse{
144 "MetadataRequest": NewMockMetadataResponse(t).
145 SetController(broker.BrokerID()).
146 SetBroker(broker.Addr(), broker.BrokerID()),
147 "FindCoordinatorRequest": NewMockSequence(
148 NewMockFindCoordinatorResponse(t).
149 SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
150 NewMockFindCoordinatorResponse(t).
151 SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
152 NewMockFindCoordinatorResponse(t).
153 SetCoordinator(CoordinatorTransaction, "txid-group", broker),
154 ),
155 "InitProducerIDRequest": NewMockSequence(
156 NewMockInitProducerIDResponse(t).
157 SetError(ErrOffsetsLoadInProgress),
158 NewMockInitProducerIDResponse(t).
159 SetError(ErrOffsetsLoadInProgress),
160 NewMockInitProducerIDResponse(t).
161 SetProducerID(1).
162 SetProducerEpoch(0),
163 ),
164 })
165
166 client, err := NewClient([]string{broker.Addr()}, config)
167 require.NoError(t, err)
168 defer client.Close()
169
170 txmng, err := newTransactionManager(config, client)
171 require.NoError(t, err)
172
173 require.Equal(t, int64(1), txmng.producerID)
174 require.Equal(t, int16(0), txmng.producerEpoch)
175 require.Equal(t, ProducerTxnFlagReady, txmng.status)
176}
177
178func TestMaybeAddPartitionToCurrentTxn(t *testing.T) {
179 type testCase struct {

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
SetHandlerByMapMethod · 0.95
BrokerIDMethod · 0.95
AddrMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockSequenceFunction · 0.85
newTransactionManagerFunction · 0.85
SetBrokerMethod · 0.80

Tested by

no test coverage detected