If leadership for a partition is changing then consumer resolves the new leader and switches to it.
(t *testing.T)
| 1222 | // If leadership for a partition is changing then consumer resolves the new |
| 1223 | // leader and switches to it. |
| 1224 | func TestConsumerRebalancingMultiplePartitions(t *testing.T) { |
| 1225 | // initial setup |
| 1226 | seedBroker := NewMockBroker(t, 10) |
| 1227 | leader0 := NewMockBroker(t, 0) |
| 1228 | leader1 := NewMockBroker(t, 1) |
| 1229 | |
| 1230 | seedBroker.SetHandlerByMap(map[string]MockResponse{ |
| 1231 | "MetadataRequest": NewMockMetadataResponse(t). |
| 1232 | SetBroker(leader0.Addr(), leader0.BrokerID()). |
| 1233 | SetBroker(leader1.Addr(), leader1.BrokerID()). |
| 1234 | SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). |
| 1235 | SetLeader("my_topic", 0, leader0.BrokerID()). |
| 1236 | SetLeader("my_topic", 1, leader1.BrokerID()), |
| 1237 | }) |
| 1238 | |
| 1239 | mockOffsetResponse1 := NewMockOffsetResponse(t). |
| 1240 | SetOffset("my_topic", 0, OffsetOldest, 0). |
| 1241 | SetOffset("my_topic", 0, OffsetNewest, 1000). |
| 1242 | SetOffset("my_topic", 1, OffsetOldest, 0). |
| 1243 | SetOffset("my_topic", 1, OffsetNewest, 1000) |
| 1244 | leader0.SetHandlerByMap(map[string]MockResponse{ |
| 1245 | "OffsetRequest": mockOffsetResponse1, |
| 1246 | "FetchRequest": NewMockFetchResponse(t, 1), |
| 1247 | }) |
| 1248 | leader1.SetHandlerByMap(map[string]MockResponse{ |
| 1249 | "OffsetRequest": mockOffsetResponse1, |
| 1250 | "FetchRequest": NewMockFetchResponse(t, 1), |
| 1251 | }) |
| 1252 | |
| 1253 | // launch test goroutines |
| 1254 | config := NewTestConfig() |
| 1255 | config.ClientID = t.Name() |
| 1256 | config.Consumer.Retry.Backoff = 50 |
| 1257 | master, err := NewConsumer([]string{seedBroker.Addr()}, config) |
| 1258 | if err != nil { |
| 1259 | t.Fatal(err) |
| 1260 | } |
| 1261 | |
| 1262 | consumers := map[int32]PartitionConsumer{} |
| 1263 | checkMessage := func(partition int32, offset int) { |
| 1264 | c := consumers[partition] |
| 1265 | message := <-c.Messages() |
| 1266 | t.Logf("Received message my_topic-%d offset=%d", partition, message.Offset) |
| 1267 | if message.Offset != int64(offset) { |
| 1268 | t.Error("Incorrect message offset!", offset, partition, message.Offset) |
| 1269 | } |
| 1270 | if message.Partition != partition { |
| 1271 | t.Error("Incorrect message partition!") |
| 1272 | } |
| 1273 | } |
| 1274 | |
| 1275 | for i := range int32(2) { |
| 1276 | consumer, err := master.ConsumePartition("my_topic", i, 0) |
| 1277 | if err != nil { |
| 1278 | t.Fatal(err) |
| 1279 | } |
| 1280 | |
| 1281 | go func(c PartitionConsumer) { |
nothing calls this directly
no test coverage detected