TestConsumeMessagesTrackLeader ensures that in the event that leadership of a topicPartition changes and no preferredReadReplica is specified, the consumer connects back to the new leader to resume consumption and doesn't continue consuming from the follower. See https://github.com/IBM/sarama/issue
(t *testing.T)
| 1053 | // |
| 1054 | // See https://github.com/IBM/sarama/issues/1927 |
| 1055 | func TestConsumeMessagesTrackLeader(t *testing.T) { |
| 1056 | cfg := NewTestConfig() |
| 1057 | cfg.ClientID = t.Name() |
| 1058 | cfg.Metadata.RefreshFrequency = time.Millisecond * 50 |
| 1059 | cfg.Consumer.Retry.Backoff = 0 |
| 1060 | cfg.Net.MaxOpenRequests = 1 |
| 1061 | cfg.Version = V2_1_0_0 |
| 1062 | |
| 1063 | leader1 := NewMockBroker(t, 1) |
| 1064 | leader2 := NewMockBroker(t, 2) |
| 1065 | |
| 1066 | mockMetadataResponse1 := NewMockMetadataResponse(t). |
| 1067 | SetBroker(leader1.Addr(), leader1.BrokerID()). |
| 1068 | SetBroker(leader2.Addr(), leader2.BrokerID()). |
| 1069 | SetLeader("my_topic", 0, leader1.BrokerID()) |
| 1070 | mockMetadataResponse2 := NewMockMetadataResponse(t). |
| 1071 | SetBroker(leader1.Addr(), leader1.BrokerID()). |
| 1072 | SetBroker(leader2.Addr(), leader2.BrokerID()). |
| 1073 | SetLeader("my_topic", 0, leader2.BrokerID()) |
| 1074 | mockMetadataResponse3 := NewMockMetadataResponse(t). |
| 1075 | SetBroker(leader1.Addr(), leader1.BrokerID()). |
| 1076 | SetBroker(leader2.Addr(), leader2.BrokerID()). |
| 1077 | SetLeader("my_topic", 0, leader1.BrokerID()) |
| 1078 | |
| 1079 | leader1.SetHandlerByMap(map[string]MockResponse{ |
| 1080 | "MetadataRequest": mockMetadataResponse1, |
| 1081 | "OffsetRequest": NewMockOffsetResponse(t). |
| 1082 | SetOffset("my_topic", 0, OffsetNewest, 1234). |
| 1083 | SetOffset("my_topic", 0, OffsetOldest, 0), |
| 1084 | "FetchRequest": NewMockFetchResponse(t, 1). |
| 1085 | SetMessage("my_topic", 0, 1, testMsg). |
| 1086 | SetMessage("my_topic", 0, 2, testMsg), |
| 1087 | }) |
| 1088 | |
| 1089 | leader2.SetHandlerByMap(map[string]MockResponse{ |
| 1090 | "MetadataRequest": mockMetadataResponse1, |
| 1091 | }) |
| 1092 | |
| 1093 | client, err := NewClient([]string{leader1.Addr()}, cfg) |
| 1094 | if err != nil { |
| 1095 | t.Fatal(err) |
| 1096 | } |
| 1097 | |
| 1098 | consumer, err := NewConsumerFromClient(client) |
| 1099 | if err != nil { |
| 1100 | t.Fatal(err) |
| 1101 | } |
| 1102 | |
| 1103 | pConsumer, err := consumer.ConsumePartition("my_topic", 0, 1) |
| 1104 | if err != nil { |
| 1105 | t.Fatal(err) |
| 1106 | } |
| 1107 | |
| 1108 | assertMessageOffset(t, <-pConsumer.Messages(), 1) |
| 1109 | assertMessageOffset(t, <-pConsumer.Messages(), 2) |
| 1110 | |
| 1111 | fetchEmptyResponse := &FetchResponse{Version: 10} |
| 1112 | fetchEmptyResponse.AddError("my_topic", 0, ErrNoError) |
nothing calls this directly
no test coverage detected