MCPcopy
hub / github.com/IBM/sarama / TestClientCoordinatorChangeWithConsumerOffsetsTopic

Function TestClientCoordinatorChangeWithConsumerOffsetsTopic

client_test.go:1231–1305  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1229}
1230
1231func TestClientCoordinatorChangeWithConsumerOffsetsTopic(t *testing.T) {
1232 seedBroker := NewMockBroker(t, 1)
1233 staleCoordinator := NewMockBroker(t, 2)
1234 freshCoordinator := NewMockBroker(t, 3)
1235
1236 replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
1237 metadataResponse1 := new(MetadataResponse)
1238 metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
1239 metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
1240 metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
1241 seedBroker.Returns(metadataResponse1)
1242
1243 client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
1244 if err != nil {
1245 t.Fatal(err)
1246 }
1247
1248 findCoordinatorResponse := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", staleCoordinator)
1249 staleCoordinator.SetHandlerByMap(map[string]MockResponse{
1250 "FindCoordinatorRequest": findCoordinatorResponse,
1251 })
1252 freshCoordinator.SetHandlerByMap(map[string]MockResponse{
1253 "FindCoordinatorRequest": findCoordinatorResponse,
1254 })
1255 broker, err := client.Coordinator("my_group")
1256 if err != nil {
1257 t.Error(err)
1258 }
1259
1260 if staleCoordinator.Addr() != broker.Addr() {
1261 t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
1262 }
1263
1264 if staleCoordinator.BrokerID() != broker.ID() {
1265 t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
1266 }
1267
1268 // Grab the cached value
1269 broker2, err := client.Coordinator("my_group")
1270 if err != nil {
1271 t.Error(err)
1272 }
1273
1274 if broker2.Addr() != broker.Addr() {
1275 t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
1276 }
1277
1278 findCoordinatorResponse2 := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", freshCoordinator)
1279 staleCoordinator.SetHandlerByMap(map[string]MockResponse{
1280 "FindCoordinatorRequest": findCoordinatorResponse2,
1281 })
1282 freshCoordinator.SetHandlerByMap(map[string]MockResponse{
1283 "FindCoordinatorRequest": findCoordinatorResponse2,
1284 })
1285
1286 // Refresh the locally cached value because it's stale
1287 if err := client.RefreshCoordinator("my_group"); err != nil {
1288 t.Error(err)

Callers

nothing calls this directly

Calls 15

BrokerIDMethod · 0.95
AddrMethod · 0.95
ReturnsMethod · 0.95
SetHandlerByMapMethod · 0.95
CoordinatorMethod · 0.95
RefreshCoordinatorMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
IDMethod · 0.80

Tested by

no test coverage detected