(t *testing.T)
| 1229 | } |
| 1230 | |
| 1231 | func TestClientCoordinatorChangeWithConsumerOffsetsTopic(t *testing.T) { |
| 1232 | seedBroker := NewMockBroker(t, 1) |
| 1233 | staleCoordinator := NewMockBroker(t, 2) |
| 1234 | freshCoordinator := NewMockBroker(t, 3) |
| 1235 | |
| 1236 | replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()} |
| 1237 | metadataResponse1 := new(MetadataResponse) |
| 1238 | metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID()) |
| 1239 | metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID()) |
| 1240 | metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) |
| 1241 | seedBroker.Returns(metadataResponse1) |
| 1242 | |
| 1243 | client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) |
| 1244 | if err != nil { |
| 1245 | t.Fatal(err) |
| 1246 | } |
| 1247 | |
| 1248 | findCoordinatorResponse := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", staleCoordinator) |
| 1249 | staleCoordinator.SetHandlerByMap(map[string]MockResponse{ |
| 1250 | "FindCoordinatorRequest": findCoordinatorResponse, |
| 1251 | }) |
| 1252 | freshCoordinator.SetHandlerByMap(map[string]MockResponse{ |
| 1253 | "FindCoordinatorRequest": findCoordinatorResponse, |
| 1254 | }) |
| 1255 | broker, err := client.Coordinator("my_group") |
| 1256 | if err != nil { |
| 1257 | t.Error(err) |
| 1258 | } |
| 1259 | |
| 1260 | if staleCoordinator.Addr() != broker.Addr() { |
| 1261 | t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr()) |
| 1262 | } |
| 1263 | |
| 1264 | if staleCoordinator.BrokerID() != broker.ID() { |
| 1265 | t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID()) |
| 1266 | } |
| 1267 | |
| 1268 | // Grab the cached value |
| 1269 | broker2, err := client.Coordinator("my_group") |
| 1270 | if err != nil { |
| 1271 | t.Error(err) |
| 1272 | } |
| 1273 | |
| 1274 | if broker2.Addr() != broker.Addr() { |
| 1275 | t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) |
| 1276 | } |
| 1277 | |
| 1278 | findCoordinatorResponse2 := NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, "my_group", freshCoordinator) |
| 1279 | staleCoordinator.SetHandlerByMap(map[string]MockResponse{ |
| 1280 | "FindCoordinatorRequest": findCoordinatorResponse2, |
| 1281 | }) |
| 1282 | freshCoordinator.SetHandlerByMap(map[string]MockResponse{ |
| 1283 | "FindCoordinatorRequest": findCoordinatorResponse2, |
| 1284 | }) |
| 1285 | |
| 1286 | // Refresh the locally cached value because it's stale |
| 1287 | if err := client.RefreshCoordinator("my_group"); err != nil { |
| 1288 | t.Error(err) |
nothing calls this directly
no test coverage detected