MCPcopy
hub / github.com/IBM/sarama / TestReadOnlyAndAllCommittedMessages

Function TestReadOnlyAndAllCommittedMessages

functional_consumer_test.go:218–359  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

216}
217
218func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
219 t.Skip("TODO: TestReadOnlyAndAllCommittedMessages is periodically failing inexplicably.")
220 checkKafkaVersion(t, "0.11.0")
221 setupFunctionalTest(t)
222 defer teardownFunctionalTest(t)
223
224 config := NewFunctionalTestConfig()
225 config.ClientID = t.Name()
226 config.Net.MaxOpenRequests = 1
227 config.Consumer.IsolationLevel = ReadCommitted
228 config.Producer.Idempotent = true
229 config.Producer.Return.Successes = true
230 config.Producer.RequiredAcks = WaitForAll
231
232 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
233 if err != nil {
234 t.Fatal(err)
235 }
236 defer client.Close()
237 controller, err := client.Controller()
238 if err != nil {
239 t.Fatal(err)
240 }
241 defer controller.Close()
242
243 transactionalID := strconv.FormatInt(time.Now().UnixNano()/(1<<22), 10)
244
245 var coordinator *Broker
246
247 // find the transaction coordinator
248 for {
249 coordRes, err := controller.FindCoordinator(&FindCoordinatorRequest{
250 Version: 2,
251 CoordinatorKey: transactionalID,
252 CoordinatorType: CoordinatorTransaction,
253 })
254 if err != nil {
255 t.Fatal(err)
256 }
257 if coordRes.Err != ErrNoError {
258 continue
259 }
260 if err := coordRes.Coordinator.Open(client.Config()); err != nil {
261 t.Fatal(err)
262 }
263 coordinator = coordRes.Coordinator
264 break
265 }
266
267 // produce some uncommitted messages to the topic
268 pidRes, err := coordinator.InitProducerID(&InitProducerIDRequest{
269 TransactionalID: &transactionalID,
270 TransactionTimeout: 10 * time.Second,
271 })
272 if err != nil {
273 t.Fatal(err)
274 }
275 _, _ = coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
ControllerMethod · 0.95
ConfigMethod · 0.95
InitProducerIDMethod · 0.95
AddPartitionsToTxnMethod · 0.95
addMethod · 0.95
buildRequestMethod · 0.95
ProduceMethod · 0.95
CloseMethod · 0.95
InputMethod · 0.95
SuccessesMethod · 0.95
EndTxnMethod · 0.95

Tested by

no test coverage detected