MCPcopy
hub / github.com/IBM/sarama / TestConsumeMessagesTrackLeader

Function TestConsumeMessagesTrackLeader

consumer_test.go:1055–1164  ·  view source on GitHub ↗

TestConsumeMessagesTrackLeader ensures that in the event that leadership of a topicPartition changes and no preferredReadReplica is specified, the consumer connects back to the new leader to resume consumption and doesn't continue consuming from the follower. See https://github.com/IBM/sarama/issue

(t *testing.T)

Source from the content-addressed store, hash-verified

1053//
1054// See https://github.com/IBM/sarama/issues/1927
1055func TestConsumeMessagesTrackLeader(t *testing.T) {
1056 cfg := NewTestConfig()
1057 cfg.ClientID = t.Name()
1058 cfg.Metadata.RefreshFrequency = time.Millisecond * 50
1059 cfg.Consumer.Retry.Backoff = 0
1060 cfg.Net.MaxOpenRequests = 1
1061 cfg.Version = V2_1_0_0
1062
1063 leader1 := NewMockBroker(t, 1)
1064 leader2 := NewMockBroker(t, 2)
1065
1066 mockMetadataResponse1 := NewMockMetadataResponse(t).
1067 SetBroker(leader1.Addr(), leader1.BrokerID()).
1068 SetBroker(leader2.Addr(), leader2.BrokerID()).
1069 SetLeader("my_topic", 0, leader1.BrokerID())
1070 mockMetadataResponse2 := NewMockMetadataResponse(t).
1071 SetBroker(leader1.Addr(), leader1.BrokerID()).
1072 SetBroker(leader2.Addr(), leader2.BrokerID()).
1073 SetLeader("my_topic", 0, leader2.BrokerID())
1074 mockMetadataResponse3 := NewMockMetadataResponse(t).
1075 SetBroker(leader1.Addr(), leader1.BrokerID()).
1076 SetBroker(leader2.Addr(), leader2.BrokerID()).
1077 SetLeader("my_topic", 0, leader1.BrokerID())
1078
1079 leader1.SetHandlerByMap(map[string]MockResponse{
1080 "MetadataRequest": mockMetadataResponse1,
1081 "OffsetRequest": NewMockOffsetResponse(t).
1082 SetOffset("my_topic", 0, OffsetNewest, 1234).
1083 SetOffset("my_topic", 0, OffsetOldest, 0),
1084 "FetchRequest": NewMockFetchResponse(t, 1).
1085 SetMessage("my_topic", 0, 1, testMsg).
1086 SetMessage("my_topic", 0, 2, testMsg),
1087 })
1088
1089 leader2.SetHandlerByMap(map[string]MockResponse{
1090 "MetadataRequest": mockMetadataResponse1,
1091 })
1092
1093 client, err := NewClient([]string{leader1.Addr()}, cfg)
1094 if err != nil {
1095 t.Fatal(err)
1096 }
1097
1098 consumer, err := NewConsumerFromClient(client)
1099 if err != nil {
1100 t.Fatal(err)
1101 }
1102
1103 pConsumer, err := consumer.ConsumePartition("my_topic", 0, 1)
1104 if err != nil {
1105 t.Fatal(err)
1106 }
1107
1108 assertMessageOffset(t, <-pConsumer.Messages(), 1)
1109 assertMessageOffset(t, <-pConsumer.Messages(), 2)
1110
1111 fetchEmptyResponse := &FetchResponse{Version: 10}
1112 fetchEmptyResponse.AddError("my_topic", 0, ErrNoError)

Callers

nothing calls this directly

Calls 15

AddrMethod · 0.95
BrokerIDMethod · 0.95
SetHandlerByMapMethod · 0.95
AddErrorMethod · 0.95
LeaderMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
NewConsumerFromClientFunction · 0.85
assertMessageOffsetFunction · 0.85

Tested by

no test coverage detected