MCPcopy
hub / github.com/IBM/sarama / TestConsumerRebalancingMultiplePartitions

Function TestConsumerRebalancingMultiplePartitions

consumer_test.go:1224–1422  ·  consumer_test.go::TestConsumerRebalancingMultiplePartitions

If leadership for a partition is changing then consumer resolves the new leader and switches to it.

(t *testing.T)

Source from the content-addressed store, hash-verified

1222// If leadership for a partition is changing then consumer resolves the new
1223// leader and switches to it.
1224func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
1225 // initial setup
1226 seedBroker := NewMockBroker(t, 10)
1227 leader0 := NewMockBroker(t, 0)
1228 leader1 := NewMockBroker(t, 1)
1229
1230 seedBroker.SetHandlerByMap(map[string]MockResponse{
1231 "MetadataRequest": NewMockMetadataResponse(t).
1232 SetBroker(leader0.Addr(), leader0.BrokerID()).
1233 SetBroker(leader1.Addr(), leader1.BrokerID()).
1234 SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
1235 SetLeader("my_topic", 0, leader0.BrokerID()).
1236 SetLeader("my_topic", 1, leader1.BrokerID()),
1237 })
1238
1239 mockOffsetResponse1 := NewMockOffsetResponse(t).
1240 SetOffset("my_topic", 0, OffsetOldest, 0).
1241 SetOffset("my_topic", 0, OffsetNewest, 1000).
1242 SetOffset("my_topic", 1, OffsetOldest, 0).
1243 SetOffset("my_topic", 1, OffsetNewest, 1000)
1244 leader0.SetHandlerByMap(map[string]MockResponse{
1245 "OffsetRequest": mockOffsetResponse1,
1246 "FetchRequest": NewMockFetchResponse(t, 1),
1247 })
1248 leader1.SetHandlerByMap(map[string]MockResponse{
1249 "OffsetRequest": mockOffsetResponse1,
1250 "FetchRequest": NewMockFetchResponse(t, 1),
1251 })
1252
1253 // launch test goroutines
1254 config := NewTestConfig()
1255 config.ClientID = t.Name()
1256 config.Consumer.Retry.Backoff = 50
1257 master, err := NewConsumer([]string{seedBroker.Addr()}, config)
1258 if err != nil {
1259 t.Fatal(err)
1260 }
1261
1262 consumers := map[int32]PartitionConsumer{}
1263 checkMessage := func(partition int32, offset int) {
1264 c := consumers[partition]
1265 message := <-c.Messages()
1266 t.Logf("Received message my_topic-%d offset=%d", partition, message.Offset)
1267 if message.Offset != int64(offset) {
1268 t.Error("Incorrect message offset!", offset, partition, message.Offset)
1269 }
1270 if message.Partition != partition {
1271 t.Error("Incorrect message partition!")
1272 }
1273 }
1274
1275 for i := range int32(2) {
1276 consumer, err := master.ConsumePartition("my_topic", i, 0)
1277 if err != nil {
1278 t.Fatal(err)
1279 }
1280
1281 go func(c PartitionConsumer) {

Callers

nothing calls this directly

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
SetMessageMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
NewMockWrapperFunction · 0.85
NewMockSequenceFunction · 0.85

Tested by

no test coverage detected