(t *testing.T)
| 155 | } |
| 156 | |
| 157 | func TestPauseResumeConsumption(t *testing.T) { |
| 158 | // Given |
| 159 | broker0 := NewMockBroker(t, 0) |
| 160 | |
| 161 | const newestOffsetBroker = 1233 |
| 162 | const maxOffsetBroker = newestOffsetBroker + 10 |
| 163 | offsetBroker := newestOffsetBroker |
| 164 | offsetClient := offsetBroker |
| 165 | |
| 166 | mockFetchResponse := NewMockFetchResponse(t, 1) |
| 167 | mockFetchResponse.SetMessage("my_topic", 0, int64(newestOffsetBroker), testMsg) |
| 168 | offsetBroker++ |
| 169 | |
| 170 | brokerResponses := map[string]MockResponse{ |
| 171 | "MetadataRequest": NewMockMetadataResponse(t). |
| 172 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 173 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 174 | "OffsetRequest": NewMockOffsetResponse(t). |
| 175 | SetOffset("my_topic", 0, OffsetOldest, 0). |
| 176 | SetOffset("my_topic", 0, OffsetNewest, int64(newestOffsetBroker)), |
| 177 | "FetchRequest": mockFetchResponse, |
| 178 | } |
| 179 | |
| 180 | broker0.SetHandlerByMap(brokerResponses) |
| 181 | |
| 182 | // When |
| 183 | master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) |
| 184 | if err != nil { |
| 185 | t.Fatal(err) |
| 186 | } |
| 187 | |
| 188 | consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest) |
| 189 | if err != nil { |
| 190 | t.Fatal(err) |
| 191 | } |
| 192 | |
| 193 | // pause the consumption |
| 194 | consumer.Pause() |
| 195 | |
| 196 | // set more msgs on broker |
| 197 | for ; offsetBroker < maxOffsetBroker; offsetBroker++ { |
| 198 | mockFetchResponse = mockFetchResponse.SetMessage("my_topic", 0, int64(offsetBroker), testMsg) |
| 199 | } |
| 200 | brokerResponses["FetchRequest"] = mockFetchResponse |
| 201 | broker0.SetHandlerByMap(brokerResponses) |
| 202 | |
| 203 | keepConsuming := true |
| 204 | for keepConsuming { |
| 205 | select { |
| 206 | case message := <-consumer.Messages(): |
| 207 | // only the first msg is expected to be consumed |
| 208 | offsetClient++ |
| 209 | assertMessageOffset(t, message, int64(newestOffsetBroker)) |
| 210 | case err := <-consumer.Errors(): |
| 211 | t.Fatal(err) |
| 212 | case <-time.After(time.Second): |
| 213 | // is expected to timedout once the consumption is pauses |
| 214 | keepConsuming = false |
nothing calls this directly
no test coverage detected