If the initial offset passed on partition consumer creation is out of the actual offset range for the partition, then the partition consumer stops immediately closing its output channels.
(t *testing.T)
| 597 | // actual offset range for the partition, then the partition consumer stops |
| 598 | // immediately closing its output channels. |
| 599 | func TestConsumerShutsDownOutOfRange(t *testing.T) { |
| 600 | // Given |
| 601 | broker0 := NewMockBroker(t, 0) |
| 602 | fetchResponse := new(FetchResponse) |
| 603 | fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange) |
| 604 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 605 | "MetadataRequest": NewMockMetadataResponse(t). |
| 606 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 607 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 608 | "OffsetRequest": NewMockOffsetResponse(t). |
| 609 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 610 | SetOffset("my_topic", 0, OffsetOldest, 7), |
| 611 | "FetchRequest": NewMockWrapper(fetchResponse), |
| 612 | }) |
| 613 | |
| 614 | master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) |
| 615 | if err != nil { |
| 616 | t.Fatal(err) |
| 617 | } |
| 618 | |
| 619 | // When |
| 620 | consumer, err := master.ConsumePartition("my_topic", 0, 101) |
| 621 | if err != nil { |
| 622 | t.Fatal(err) |
| 623 | } |
| 624 | |
| 625 | // Then: consumer should shut down closing its messages and errors channels. |
| 626 | if _, ok := <-consumer.Messages(); ok { |
| 627 | t.Error("Expected the consumer to shut down") |
| 628 | } |
| 629 | safeClose(t, consumer) |
| 630 | |
| 631 | safeClose(t, master) |
| 632 | broker0.Close() |
| 633 | } |
| 634 | |
| 635 | // If a fetch response contains messages with offsets that are smaller then |
| 636 | // requested, then such messages are ignored. |
nothing calls this directly
no test coverage detected