If a message is given a key, it can be correctly collected while consuming.
(t *testing.T)
| 91 | |
| 92 | // If a message is given a key, it can be correctly collected while consuming. |
| 93 | func TestConsumerMessageWithKey(t *testing.T) { |
| 94 | // Given |
| 95 | broker0 := NewMockBroker(t, 0) |
| 96 | |
| 97 | manualOffset := int64(1234) |
| 98 | offsetNewest := int64(2345) |
| 99 | offsetNewestAfterFetchRequest := int64(3456) |
| 100 | |
| 101 | mockFetchResponse := NewMockFetchResponse(t, 1) |
| 102 | |
| 103 | // skipped because parseRecords(): offset < child.offset |
| 104 | mockFetchResponse.SetMessageWithKey("my_topic", 0, manualOffset-1, testKey, testMsg) |
| 105 | |
| 106 | for i := range int64(10) { |
| 107 | mockFetchResponse.SetMessageWithKey("my_topic", 0, i+manualOffset, testKey, testMsg) |
| 108 | } |
| 109 | |
| 110 | mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest) |
| 111 | |
| 112 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 113 | "MetadataRequest": NewMockMetadataResponse(t). |
| 114 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 115 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 116 | "OffsetRequest": NewMockOffsetResponse(t). |
| 117 | SetOffset("my_topic", 0, OffsetOldest, 0). |
| 118 | SetOffset("my_topic", 0, OffsetNewest, offsetNewest), |
| 119 | "FetchRequest": mockFetchResponse, |
| 120 | }) |
| 121 | |
| 122 | // When |
| 123 | master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) |
| 124 | if err != nil { |
| 125 | t.Fatal(err) |
| 126 | } |
| 127 | |
| 128 | consumer, err := master.ConsumePartition("my_topic", 0, manualOffset) |
| 129 | if err != nil { |
| 130 | t.Fatal(err) |
| 131 | } |
| 132 | |
| 133 | // Then |
| 134 | if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { |
| 135 | t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) |
| 136 | } |
| 137 | for i := range int64(10) { |
| 138 | select { |
| 139 | case message := <-consumer.Messages(): |
| 140 | assertMessageOffset(t, message, i+manualOffset) |
| 141 | assertMessageKey(t, message, testKey) |
| 142 | assertMessageValue(t, message, testMsg) |
| 143 | case err := <-consumer.Errors(): |
| 144 | t.Error(err) |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { |
| 149 | t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) |
| 150 | } |
nothing calls this directly
no test coverage detected