MCPcopy
hub / github.com/IBM/sarama / TestConsumerClosePartitionWithoutLeader

Function TestConsumerClosePartitionWithoutLeader

consumer_test.go:545–594  ·  view source on GitHub ↗

Nothing bad happens if a partition consumer that has no leader assigned at the moment is closed.

(t *testing.T)

Source from the content-addressed store, hash-verified

543// Nothing bad happens if a partition consumer that has no leader assigned at
544// the moment is closed.
545func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
546 // Given
547 broker0 := NewMockBroker(t, 100)
548 broker0.SetHandlerByMap(map[string]MockResponse{
549 "MetadataRequest": NewMockMetadataResponse(t).
550 SetBroker(broker0.Addr(), broker0.BrokerID()).
551 SetLeader("my_topic", 0, broker0.BrokerID()),
552 "OffsetRequest": NewMockOffsetResponse(t).
553 SetOffset("my_topic", 0, OffsetOldest, 123).
554 SetOffset("my_topic", 0, OffsetNewest, 1000),
555 "FetchRequest": NewMockFetchResponse(t, 1).
556 SetMessage("my_topic", 0, 123, testMsg),
557 })
558
559 config := NewTestConfig()
560 config.Net.ReadTimeout = 100 * time.Millisecond
561 config.Consumer.Retry.Backoff = 100 * time.Millisecond
562 config.Consumer.Return.Errors = true
563 config.Metadata.Retry.Max = 0
564 c, err := NewConsumer([]string{broker0.Addr()}, config)
565 if err != nil {
566 t.Fatal(err)
567 }
568
569 pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
570 if err != nil {
571 t.Fatal(err)
572 }
573
574 assertMessageOffset(t, <-pc.Messages(), 123)
575
576 // broker0 says that it is no longer the leader for my_topic/0, but the
577 // requests to retrieve metadata fail with network timeout.
578 fetchResponse2 := &FetchResponse{}
579 fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
580
581 broker0.SetHandlerByMap(map[string]MockResponse{
582 "FetchRequest": NewMockWrapper(fetchResponse2),
583 })
584
585 // When
586 if consErr := <-pc.Errors(); !errors.Is(consErr.Err, ErrOutOfBrokers) {
587 t.Errorf("Unexpected error: %v", consErr.Err)
588 }
589
590 // Then: the partition consumer can be closed without any problem.
591 safeClose(t, pc)
592 safeClose(t, c)
593 broker0.Close()
594}
595
596// If the initial offset passed on partition consumer creation is out of the
597// actual offset range for the partition, then the partition consumer stops

Callers

nothing calls this directly

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
AddErrorMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
NewMockWrapperFunction · 0.85

Tested by

no test coverage detected