MCPcopy
hub / github.com/IBM/sarama / runConsumerLeaderRefreshErrorTestWithConfig

Function runConsumerLeaderRefreshErrorTestWithConfig

consumer_test.go:415–484  ·  view source on GitHub ↗
(t *testing.T, config *Config)

Source from the content-addressed store, hash-verified

413}
414
415func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
416 // Given
417 broker0 := NewMockBroker(t, 100)
418
419 // Stage 1: my_topic/0 served by broker0
420 Logger.Printf(" STAGE 1")
421
422 broker0.SetHandlerByMap(map[string]MockResponse{
423 "MetadataRequest": NewMockMetadataResponse(t).
424 SetBroker(broker0.Addr(), broker0.BrokerID()).
425 SetLeader("my_topic", 0, broker0.BrokerID()),
426 "OffsetRequest": NewMockOffsetResponse(t).
427 SetOffset("my_topic", 0, OffsetOldest, 123).
428 SetOffset("my_topic", 0, OffsetNewest, 1000),
429 "FetchRequest": NewMockFetchResponse(t, 1).
430 SetMessage("my_topic", 0, 123, testMsg),
431 })
432
433 c, err := NewConsumer([]string{broker0.Addr()}, config)
434 if err != nil {
435 t.Fatal(err)
436 }
437
438 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
439 if err != nil {
440 t.Fatal(err)
441 }
442
443 assertMessageOffset(t, <-pc.Messages(), 123)
444
445 // Stage 2: broker0 says that it is no longer the leader for my_topic/0,
446 // but the requests to retrieve metadata fail with network timeout.
447 Logger.Printf(" STAGE 2")
448
449 fetchResponse2 := &FetchResponse{}
450 fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
451
452 broker0.SetHandlerByMap(map[string]MockResponse{
453 "FetchRequest": NewMockWrapper(fetchResponse2),
454 })
455
456 if consErr := <-pc.Errors(); !errors.Is(consErr.Err, ErrOutOfBrokers) {
457 t.Errorf("Unexpected error: %v", consErr.Err)
458 }
459
460 // Stage 3: finally the metadata returned by broker0 tells that broker1 is
461 // a new leader for my_topic/0. Consumption resumes.
462
463 Logger.Printf(" STAGE 3")
464
465 broker1 := NewMockBroker(t, 101)
466
467 broker1.SetHandlerByMap(map[string]MockResponse{
468 "FetchRequest": NewMockFetchResponse(t, 1).
469 SetMessage("my_topic", 0, 124, testMsg),
470 })
471 broker0.SetHandlerByMap(map[string]MockResponse{
472 "MetadataRequest": NewMockMetadataResponse(t).

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
AddErrorMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
NewMockWrapperFunction · 0.85

Tested by

no test coverage detected